risingwave_frontend/handler/create_table/
col_id_gen.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, INITIAL_TABLE_VERSION_ID, TableVersionId};
20use risingwave_common::types::{DataType, MapType, StructType, data_types};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use crate::TableCatalog;
24use crate::catalog::ColumnId;
25use crate::catalog::table_catalog::TableVersion;
26use crate::error::Result;
27
28/// A segment in a [`Path`].
29#[derive(Debug, Clone, Hash, PartialEq, Eq)]
30enum Segment {
31    Field(String), // for both top-level columns and struct fields
32    ListElement,
33    MapKey,
34    MapValue,
35}
36
37impl std::fmt::Display for Segment {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Segment::Field(name) => write!(f, "{}", name),
41            Segment::ListElement => write!(f, "element"),
42            Segment::MapKey => write!(f, "key"),
43            Segment::MapValue => write!(f, "value"),
44        }
45    }
46}
47
48/// The path to a nested field in a column with composite data type.
49type Path = Vec<Segment>;
50
51type Existing = HashMap<Path, (ColumnId, DataType)>;
52
53/// Column ID generator for a new table or a new version of an existing table to alter.
54#[derive(Debug)]
55pub struct ColumnIdGenerator {
56    /// Existing fields and their IDs and data types.
57    ///
58    /// This is used for aligning field IDs between versions (`ALTER`s). If a field already
59    /// exists and the data type matches, its ID is reused. Otherwise, a new ID is generated.
60    ///
61    /// For a new table, this is empty.
62    existing: Existing,
63
64    /// The next column ID to generate, used for new columns that do not exist in `existing`.
65    next_column_id: ColumnId,
66
67    /// The version ID of the table to be created or altered.
68    ///
69    /// For a new table, this is 0. For altering an existing table, this is the **next** version ID
70    /// of the `version_id` field in the original table catalog.
71    version_id: TableVersionId,
72}
73
74impl ColumnIdGenerator {
75    /// Creates a new [`ColumnIdGenerator`] for altering an existing table.
76    pub fn new_alter(original: &TableCatalog) -> Self {
77        fn handle(existing: &mut Existing, path: &mut Path, id: ColumnId, data_type: DataType) {
78            macro_rules! with_segment {
79                ($segment:expr, $block:block) => {
80                    path.push($segment);
81                    $block
82                    path.pop();
83                };
84            }
85
86            match &data_type {
87                DataType::Struct(fields) => {
88                    for ((field_name, field_data_type), field_id) in
89                        fields.iter().zip_eq_fast(fields.ids_or_placeholder())
90                    {
91                        with_segment!(Segment::Field(field_name.to_owned()), {
92                            handle(existing, path, field_id, field_data_type.clone());
93                        });
94                    }
95                }
96                DataType::List(list) => {
97                    // There's no id for the element as list's own structure won't change.
98                    with_segment!(Segment::ListElement, {
99                        handle(existing, path, ColumnId::placeholder(), list.elem().clone());
100                    });
101                }
102                DataType::Map(map) => {
103                    // There's no id for the key/value as map's own structure won't change.
104                    with_segment!(Segment::MapKey, {
105                        handle(existing, path, ColumnId::placeholder(), map.key().clone());
106                    });
107                    with_segment!(Segment::MapValue, {
108                        handle(existing, path, ColumnId::placeholder(), map.value().clone());
109                    });
110                }
111
112                data_types::simple!() => {}
113            }
114
115            existing
116                .try_insert(path.clone(), (id, data_type))
117                .unwrap_or_else(|_| panic!("duplicate path: {:?}", path));
118        }
119
120        let mut existing = Existing::new();
121
122        // Collect all existing fields into `existing`.
123        for col in original.columns() {
124            let mut path = vec![Segment::Field(col.name().to_owned())];
125            handle(
126                &mut existing,
127                &mut path,
128                col.column_id(),
129                col.data_type().clone(),
130            );
131        }
132
133        let version = original.version().expect("version field not set");
134
135        Self {
136            existing,
137            next_column_id: version.next_column_id,
138            version_id: version.version_id + 1,
139        }
140    }
141
142    /// Creates a new [`ColumnIdGenerator`] for a new table.
143    pub fn new_initial() -> Self {
144        Self {
145            existing: Existing::new(),
146            next_column_id: ColumnId::first_user_column(),
147            version_id: INITIAL_TABLE_VERSION_ID,
148        }
149    }
150
151    /// Generate [`ColumnId`]s for the given column and its nested fields (if any) recursively.
152    /// The IDs of nested fields will be reflected in the updated [`DataType`] of the column.
153    ///
154    /// Returns an error if there's incompatible data type change.
155    pub fn generate(&mut self, col: &mut ColumnCatalog) -> Result<()> {
156        let mut path = vec![Segment::Field(col.name().to_owned())];
157
158        if let Some((original_column_id, original_data_type)) = self.existing.get(&path) {
159            if original_data_type == col.data_type() {
160                col.column_desc.column_id = *original_column_id;
161                // Equality above ignores nested field IDs. We need to clone them below.
162                col.column_desc.data_type = original_data_type.clone();
163                return Ok(());
164            } else {
165                // Check if the column can be altered.
166                match original_data_type.can_alter() {
167                    Some(true) => { /* pass */ }
168                    Some(false) => bail!(
169                        "column \"{}\" was persisted with legacy encoding thus cannot be altered, \
170                         consider dropping and readding the column",
171                        col.name()
172                    ),
173                    None => bail!(
174                        "column \"{}\" cannot be altered; only types containing struct can be altered",
175                        col.name()
176                    ),
177                }
178            }
179        }
180
181        fn handle(
182            this: &mut ColumnIdGenerator,
183            path: &mut Path,
184            data_type: DataType,
185        ) -> Result<(ColumnId, DataType)> {
186            macro_rules! with_segment {
187                ($segment:expr, $block:block) => {{
188                    path.push($segment);
189                    let ret = $block;
190                    path.pop();
191                    ret
192                }};
193            }
194
195            let original_column_id = match this.existing.get(&*path) {
196                Some((original_column_id, original_data_type)) => {
197                    // Only check the type name (discriminant) for compatibility check here.
198                    // For nested fields, we will check them recursively later.
199                    let incompatible = original_data_type.type_name() != data_type.type_name()
200                        || matches!(
201                            (original_data_type, &data_type),
202                            (DataType::Vector(old), DataType::Vector(new)) if old != new,
203                        );
204                    if incompatible {
205                        let path = path.iter().join(".");
206                        bail!(
207                            "incompatible data type change from {:?} to {:?} at path \"{}\"",
208                            original_data_type.type_name(),
209                            data_type.type_name(),
210                            path
211                        );
212                    }
213                    Some(*original_column_id)
214                }
215                None => None,
216            };
217
218            // Only top-level column and struct fields need an ID.
219            let need_gen_id = matches!(path.last().unwrap(), Segment::Field(_));
220
221            let new_id = if need_gen_id {
222                if let Some(id) = original_column_id {
223                    assert!(
224                        id != ColumnId::placeholder(),
225                        "original column id should not be placeholder"
226                    );
227                    id
228                } else {
229                    let id = this.next_column_id;
230                    this.next_column_id = this.next_column_id.next();
231                    id
232                }
233            } else {
234                // Column ID is actually unused by caller (for list and map types), just use a placeholder.
235                ColumnId::placeholder()
236            };
237
238            let new_data_type = match data_type {
239                DataType::Struct(fields) => {
240                    let mut new_fields = Vec::new();
241                    let mut ids = Vec::new();
242                    for (field_name, field_data_type) in fields.iter() {
243                        let (id, new_field_data_type) =
244                            with_segment!(Segment::Field(field_name.to_owned()), {
245                                handle(this, path, field_data_type.clone())?
246                            });
247                        new_fields.push((field_name.to_owned(), new_field_data_type));
248                        ids.push(id);
249                    }
250                    DataType::Struct(StructType::new(new_fields).with_ids(ids))
251                }
252                DataType::List(list) => {
253                    let (_, new_inner) = with_segment!(Segment::ListElement, {
254                        handle(this, path, list.into_elem())?
255                    });
256                    DataType::list(new_inner)
257                }
258                DataType::Map(map) => {
259                    let (key, value) = map.into_kv();
260                    let (_, new_key) = with_segment!(Segment::MapKey, { handle(this, path, key)? });
261                    let (_, new_value) =
262                        with_segment!(Segment::MapValue, { handle(this, path, value)? });
263                    DataType::Map(MapType::from_kv(new_key, new_value))
264                }
265
266                data_types::simple!() => data_type,
267            };
268
269            Ok((new_id, new_data_type))
270        }
271
272        let (new_column_id, new_data_type) = handle(self, &mut path, col.data_type().clone())?;
273
274        col.column_desc.column_id = new_column_id;
275        col.column_desc.data_type = new_data_type;
276
277        Ok(())
278    }
279
280    /// Consume this generator and return a [`TableVersion`] for the table to be created or altered.
281    pub fn into_version(self) -> TableVersion {
282        TableVersion {
283            version_id: self.version_id,
284            next_column_id: self.next_column_id,
285        }
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use expect_test::expect;
292    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, FieldLike};
293    use risingwave_common::types::StructType;
294    use thiserror_ext::AsReport;
295
296    use super::*;
297
298    struct BrandNewColumn(&'static str);
299    use BrandNewColumn as B;
300
301    impl FieldLike for BrandNewColumn {
302        fn name(&self) -> &str {
303            self.0
304        }
305
306        fn data_type(&self) -> &DataType {
307            &DataType::Boolean
308        }
309    }
310
311    #[easy_ext::ext(ColumnIdGeneratorTestExt)]
312    impl ColumnIdGenerator {
313        /// Generate a column ID for the given field.
314        ///
315        /// This helper function checks that the data type of the field has not changed after
316        /// generating the column ID, i.e., it is called on simple types or composite types
317        /// with `field_ids` unset and legacy encoding.
318        fn generate_simple(&mut self, field: impl FieldLike) -> Result<ColumnId> {
319            let original_data_type = field.data_type().clone();
320
321            let field = Field::new(field.name(), original_data_type.clone());
322            let mut col = ColumnCatalog {
323                column_desc: ColumnDesc::from_field_without_column_id(&field),
324                is_hidden: false,
325            };
326            self.generate(&mut col)?;
327
328            assert_eq!(
329                col.column_desc.data_type, original_data_type,
330                "data type has changed after generating column id, \
331                 are you calling this on a composite type?"
332            );
333
334            Ok(col.column_desc.column_id)
335        }
336    }
337
338    #[test]
339    fn test_col_id_gen_initial() {
340        let mut r#gen = ColumnIdGenerator::new_initial();
341        assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(1));
342        assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(2));
343    }
344
345    #[test]
346    fn test_col_id_gen_alter() {
347        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
348            columns: vec![
349                ColumnCatalog {
350                    column_desc: ColumnDesc::from_field_with_column_id(
351                        &Field::with_name(DataType::Float32, "f32"),
352                        1,
353                    ),
354                    is_hidden: false,
355                },
356                ColumnCatalog {
357                    column_desc: ColumnDesc::from_field_with_column_id(
358                        &Field::with_name(DataType::Float64, "f64"),
359                        2,
360                    ),
361                    is_hidden: false,
362                },
363                ColumnCatalog {
364                    column_desc: ColumnDesc::from_field_with_column_id(
365                        &Field::with_name(
366                            StructType::new([("f1", DataType::Int32)]).into(),
367                            "nested",
368                        ),
369                        3,
370                    ),
371                    is_hidden: false,
372                },
373            ],
374            version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))),
375            ..Default::default()
376        });
377
378        assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(4));
379        assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(5));
380        assert_eq!(
381            r#gen
382                .generate_simple(Field::new("f32", DataType::Float32))
383                .unwrap(),
384            ColumnId::new(1)
385        );
386
387        // mismatched simple data type
388        let err = r#gen
389            .generate_simple(Field::new("f64", DataType::Float32))
390            .unwrap_err();
391        expect![[r#"column "f64" cannot be altered; only types containing struct can be altered"#]]
392            .assert_eq(&err.to_report_string());
393
394        // mismatched composite data type
395        // we require the nested data type to be exactly the same
396        let err = r#gen
397            .generate_simple(Field::new(
398                "nested",
399                // NOTE: field ids are unset, legacy encoding, thus cannot be altered
400                StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(),
401            ))
402            .unwrap_err();
403        expect![[r#"column "nested" was persisted with legacy encoding thus cannot be altered, consider dropping and readding the column"#]].assert_eq(&err.to_report_string());
404
405        // matched composite data type, should work
406        let id = r#gen
407            .generate_simple(Field::new(
408                "nested",
409                StructType::new([("f1", DataType::Int32)]).into(),
410            ))
411            .unwrap();
412        assert_eq!(id, ColumnId::new(3));
413
414        assert_eq!(r#gen.generate_simple(B("v3")).unwrap(), ColumnId::new(6));
415    }
416
417    #[test]
418    fn test_col_id_gen_alter_composite_type() {
419        let ori_type = || {
420            DataType::from(StructType::new([
421                ("f1", DataType::Int32),
422                (
423                    "map",
424                    MapType::from_kv(
425                        DataType::Varchar,
426                        DataType::list(
427                            StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
428                                .into(),
429                        ),
430                    )
431                    .into(),
432                ),
433            ]))
434        };
435
436        let new_type = || {
437            DataType::from(StructType::new([
438                ("f4", DataType::Int32),
439                (
440                    "map",
441                    MapType::from_kv(
442                        DataType::Varchar,
443                        DataType::list(
444                            StructType::new([
445                                ("f5", DataType::Int32),
446                                ("f3", DataType::Boolean),
447                                ("f6", DataType::Float32),
448                            ])
449                            .into(),
450                        ),
451                    )
452                    .into(),
453                ),
454            ]))
455        };
456
457        let incompatible_new_type = || {
458            DataType::from(StructType::new([(
459                "map",
460                MapType::from_kv(
461                    DataType::Varchar,
462                    DataType::list(StructType::new([("f6", DataType::Float64)]).into()),
463                )
464                .into(),
465            )]))
466        };
467
468        let mut r#gen = ColumnIdGenerator::new_initial();
469        let mut col = ColumnCatalog {
470            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
471                "nested",
472                ori_type(),
473            )),
474            is_hidden: false,
475        };
476        r#gen.generate(&mut col).unwrap();
477        let version = r#gen.into_version();
478
479        expect![[r#"
480            ColumnCatalog {
481                column_desc: ColumnDesc {
482                    data_type: Struct(
483                        StructType {
484                            fields: [
485                                (
486                                    "f1",
487                                    Int32,
488                                ),
489                                (
490                                    "map",
491                                    Map(
492                                        MapType(
493                                            (
494                                                Varchar,
495                                                List(
496                                                    Struct(
497                                                        StructType {
498                                                            fields: [
499                                                                (
500                                                                    "f2",
501                                                                    Int32,
502                                                                ),
503                                                                (
504                                                                    "f3",
505                                                                    Boolean,
506                                                                ),
507                                                            ],
508                                                            field_ids: [
509                                                                #4,
510                                                                #5,
511                                                            ],
512                                                        },
513                                                    ),
514                                                ),
515                                            ),
516                                        ),
517                                    ),
518                                ),
519                            ],
520                            field_ids: [
521                                #2,
522                                #3,
523                            ],
524                        },
525                    ),
526                    column_id: #1,
527                    name: "nested",
528                    generated_or_default_column: None,
529                    description: None,
530                    additional_column: AdditionalColumn {
531                        column_type: None,
532                    },
533                    version: Pr13707,
534                    system_column: None,
535                    nullable: true,
536                },
537                is_hidden: false,
538            }
539        "#]]
540        .assert_debug_eq(&col);
541
542        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
543            columns: vec![col],
544            version: Some(version),
545            ..Default::default()
546        });
547        let mut new_col = ColumnCatalog {
548            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
549                "nested",
550                new_type(),
551            )),
552            is_hidden: false,
553        };
554        r#gen.generate(&mut new_col).unwrap();
555        let version = r#gen.into_version();
556
557        expect![[r#"
558            ColumnCatalog {
559                column_desc: ColumnDesc {
560                    data_type: Struct(
561                        StructType {
562                            fields: [
563                                (
564                                    "f4",
565                                    Int32,
566                                ),
567                                (
568                                    "map",
569                                    Map(
570                                        MapType(
571                                            (
572                                                Varchar,
573                                                List(
574                                                    Struct(
575                                                        StructType {
576                                                            fields: [
577                                                                (
578                                                                    "f5",
579                                                                    Int32,
580                                                                ),
581                                                                (
582                                                                    "f3",
583                                                                    Boolean,
584                                                                ),
585                                                                (
586                                                                    "f6",
587                                                                    Float32,
588                                                                ),
589                                                            ],
590                                                            field_ids: [
591                                                                #7,
592                                                                #5,
593                                                                #8,
594                                                            ],
595                                                        },
596                                                    ),
597                                                ),
598                                            ),
599                                        ),
600                                    ),
601                                ),
602                            ],
603                            field_ids: [
604                                #6,
605                                #3,
606                            ],
607                        },
608                    ),
609                    column_id: #1,
610                    name: "nested",
611                    generated_or_default_column: None,
612                    description: None,
613                    additional_column: AdditionalColumn {
614                        column_type: None,
615                    },
616                    version: Pr13707,
617                    system_column: None,
618                    nullable: true,
619                },
620                is_hidden: false,
621            }
622        "#]]
623        .assert_debug_eq(&new_col);
624
625        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
626            columns: vec![new_col],
627            version: Some(version),
628            ..Default::default()
629        });
630
631        let mut new_col = ColumnCatalog {
632            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
633                "nested",
634                incompatible_new_type(),
635            )),
636            is_hidden: false,
637        };
638        let err = r#gen.generate(&mut new_col).unwrap_err();
639        expect![[r#"incompatible data type change from Float32 to Float64 at path "nested.map.value.element.f6""#]].assert_eq(&err.to_report_string());
640    }
641}