risingwave_frontend/handler/create_table/
col_id_gen.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, INITIAL_TABLE_VERSION_ID, TableVersionId};
20use risingwave_common::types::{DataType, MapType, StructType, data_types};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use crate::TableCatalog;
24use crate::catalog::ColumnId;
25use crate::catalog::table_catalog::TableVersion;
26use crate::error::Result;
27
28/// A segment in a [`Path`].
29#[derive(Debug, Clone, Hash, PartialEq, Eq)]
30enum Segment {
31    Field(String), // for both top-level columns and struct fields
32    ListElement,
33    MapKey,
34    MapValue,
35}
36
37impl std::fmt::Display for Segment {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Segment::Field(name) => write!(f, "{}", name),
41            Segment::ListElement => write!(f, "element"),
42            Segment::MapKey => write!(f, "key"),
43            Segment::MapValue => write!(f, "value"),
44        }
45    }
46}
47
48/// The path to a nested field in a column with composite data type.
49type Path = Vec<Segment>;
50
51type Existing = HashMap<Path, (ColumnId, DataType)>;
52
53/// Column ID generator for a new table or a new version of an existing table to alter.
54#[derive(Debug)]
55pub struct ColumnIdGenerator {
56    /// Existing fields and their IDs and data types.
57    ///
58    /// This is used for aligning field IDs between versions (`ALTER`s). If a field already
59    /// exists and the data type matches, its ID is reused. Otherwise, a new ID is generated.
60    ///
61    /// For a new table, this is empty.
62    existing: Existing,
63
64    /// The next column ID to generate, used for new columns that do not exist in `existing`.
65    next_column_id: ColumnId,
66
67    /// The version ID of the table to be created or altered.
68    ///
69    /// For a new table, this is 0. For altering an existing table, this is the **next** version ID
70    /// of the `version_id` field in the original table catalog.
71    version_id: TableVersionId,
72}
73
74impl ColumnIdGenerator {
75    /// Creates a new [`ColumnIdGenerator`] for altering an existing table.
76    pub fn new_alter(original: &TableCatalog) -> Self {
77        fn handle(existing: &mut Existing, path: &mut Path, id: ColumnId, data_type: DataType) {
78            macro_rules! with_segment {
79                ($segment:expr, $block:block) => {
80                    path.push($segment);
81                    $block
82                    path.pop();
83                };
84            }
85
86            match &data_type {
87                DataType::Struct(fields) => {
88                    for ((field_name, field_data_type), field_id) in
89                        fields.iter().zip_eq_fast(fields.ids_or_placeholder())
90                    {
91                        with_segment!(Segment::Field(field_name.to_owned()), {
92                            handle(existing, path, field_id, field_data_type.clone());
93                        });
94                    }
95                }
96                DataType::List(inner) => {
97                    // There's no id for the element as list's own structure won't change.
98                    with_segment!(Segment::ListElement, {
99                        handle(existing, path, ColumnId::placeholder(), *inner.clone());
100                    });
101                }
102                DataType::Map(map) => {
103                    // There's no id for the key/value as map's own structure won't change.
104                    with_segment!(Segment::MapKey, {
105                        handle(existing, path, ColumnId::placeholder(), map.key().clone());
106                    });
107                    with_segment!(Segment::MapValue, {
108                        handle(existing, path, ColumnId::placeholder(), map.value().clone());
109                    });
110                }
111
112                data_types::simple!() => {}
113            }
114
115            existing
116                .try_insert(path.clone(), (id, data_type))
117                .unwrap_or_else(|_| panic!("duplicate path: {:?}", path));
118        }
119
120        let mut existing = Existing::new();
121
122        // Collect all existing fields into `existing`.
123        for col in original.columns() {
124            let mut path = vec![Segment::Field(col.name().to_owned())];
125            handle(
126                &mut existing,
127                &mut path,
128                col.column_id(),
129                col.data_type().clone(),
130            );
131        }
132
133        let version = original.version().expect("version field not set");
134
135        Self {
136            existing,
137            next_column_id: version.next_column_id,
138            version_id: version.version_id + 1,
139        }
140    }
141
142    /// Creates a new [`ColumnIdGenerator`] for a new table.
143    pub fn new_initial() -> Self {
144        Self {
145            existing: Existing::new(),
146            next_column_id: ColumnId::first_user_column(),
147            version_id: INITIAL_TABLE_VERSION_ID,
148        }
149    }
150
151    /// Generate [`ColumnId`]s for the given column and its nested fields (if any) recursively.
152    /// The IDs of nested fields will be reflected in the updated [`DataType`] of the column.
153    ///
154    /// Returns an error if there's incompatible data type change.
155    pub fn generate(&mut self, col: &mut ColumnCatalog) -> Result<()> {
156        let mut path = vec![Segment::Field(col.name().to_owned())];
157
158        if let Some((original_column_id, original_data_type)) = self.existing.get(&path) {
159            if original_data_type == col.data_type() {
160                // Only update the top-level column ID, without traversing nested fields.
161                col.column_desc.column_id = *original_column_id;
162                return Ok(());
163            } else {
164                // Check if the column can be altered.
165                match original_data_type.can_alter() {
166                    Some(true) => { /* pass */ }
167                    Some(false) => bail!(
168                        "column \"{}\" was persisted with legacy encoding thus cannot be altered, \
169                         consider dropping and readding the column",
170                        col.name()
171                    ),
172                    None => bail!(
173                        "column \"{}\" cannot be altered; only types containing struct can be altered",
174                        col.name()
175                    ),
176                }
177            }
178        }
179
180        fn handle(
181            this: &mut ColumnIdGenerator,
182            path: &mut Path,
183            data_type: DataType,
184        ) -> Result<(ColumnId, DataType)> {
185            macro_rules! with_segment {
186                ($segment:expr, $block:block) => {{
187                    path.push($segment);
188                    let ret = $block;
189                    path.pop();
190                    ret
191                }};
192            }
193
194            let original_column_id = match this.existing.get(&*path) {
195                Some((original_column_id, original_data_type)) => {
196                    // Only check the type name (discriminant) for compatibility check here.
197                    // For nested fields, we will check them recursively later.
198                    if original_data_type.type_name() != data_type.type_name() {
199                        let path = path.iter().join(".");
200                        bail!(
201                            "incompatible data type change from {:?} to {:?} at path \"{}\"",
202                            original_data_type.type_name(),
203                            data_type.type_name(),
204                            path
205                        );
206                    }
207                    Some(*original_column_id)
208                }
209                None => None,
210            };
211
212            // Only top-level column and struct fields need an ID.
213            let need_gen_id = matches!(path.last().unwrap(), Segment::Field(_));
214
215            let new_id = if need_gen_id {
216                if let Some(id) = original_column_id {
217                    assert!(
218                        id != ColumnId::placeholder(),
219                        "original column id should not be placeholder"
220                    );
221                    id
222                } else {
223                    let id = this.next_column_id;
224                    this.next_column_id = this.next_column_id.next();
225                    id
226                }
227            } else {
228                // Column ID is actually unused by caller (for list and map types), just use a placeholder.
229                ColumnId::placeholder()
230            };
231
232            let new_data_type = match data_type {
233                DataType::Struct(fields) => {
234                    let mut new_fields = Vec::new();
235                    let mut ids = Vec::new();
236                    for (field_name, field_data_type) in fields.iter() {
237                        let (id, new_field_data_type) =
238                            with_segment!(Segment::Field(field_name.to_owned()), {
239                                handle(this, path, field_data_type.clone())?
240                            });
241                        new_fields.push((field_name.to_owned(), new_field_data_type));
242                        ids.push(id);
243                    }
244                    DataType::Struct(StructType::new(new_fields).with_ids(ids))
245                }
246                DataType::List(inner) => {
247                    let (_, new_inner) =
248                        with_segment!(Segment::ListElement, { handle(this, path, *inner)? });
249                    DataType::List(Box::new(new_inner))
250                }
251                DataType::Map(map) => {
252                    let (_, new_key) =
253                        with_segment!(Segment::MapKey, { handle(this, path, map.key().clone())? });
254                    let (_, new_value) = with_segment!(Segment::MapValue, {
255                        handle(this, path, map.value().clone())?
256                    });
257                    DataType::Map(MapType::from_kv(new_key, new_value))
258                }
259
260                data_types::simple!() => data_type,
261            };
262
263            Ok((new_id, new_data_type))
264        }
265
266        let (new_column_id, new_data_type) = handle(self, &mut path, col.data_type().clone())?;
267
268        col.column_desc.column_id = new_column_id;
269        col.column_desc.data_type = new_data_type;
270
271        Ok(())
272    }
273
274    /// Consume this generator and return a [`TableVersion`] for the table to be created or altered.
275    pub fn into_version(self) -> TableVersion {
276        TableVersion {
277            version_id: self.version_id,
278            next_column_id: self.next_column_id,
279        }
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use expect_test::expect;
286    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, FieldLike};
287    use risingwave_common::types::StructType;
288    use thiserror_ext::AsReport;
289
290    use super::*;
291
292    struct BrandNewColumn(&'static str);
293    use BrandNewColumn as B;
294
295    impl FieldLike for BrandNewColumn {
296        fn name(&self) -> &str {
297            self.0
298        }
299
300        fn data_type(&self) -> &DataType {
301            &DataType::Boolean
302        }
303    }
304
305    #[easy_ext::ext(ColumnIdGeneratorTestExt)]
306    impl ColumnIdGenerator {
307        /// Generate a column ID for the given field.
308        ///
309        /// This helper function checks that the data type of the field has not changed after
310        /// generating the column ID, i.e., it is called on simple types or composite types
311        /// with `field_ids` unset and legacy encoding.
312        fn generate_simple(&mut self, field: impl FieldLike) -> Result<ColumnId> {
313            let original_data_type = field.data_type().clone();
314
315            let field = Field::new(field.name(), original_data_type.clone());
316            let mut col = ColumnCatalog {
317                column_desc: ColumnDesc::from_field_without_column_id(&field),
318                is_hidden: false,
319            };
320            self.generate(&mut col)?;
321
322            assert_eq!(
323                col.column_desc.data_type, original_data_type,
324                "data type has changed after generating column id, \
325                 are you calling this on a composite type?"
326            );
327
328            Ok(col.column_desc.column_id)
329        }
330    }
331
332    #[test]
333    fn test_col_id_gen_initial() {
334        let mut r#gen = ColumnIdGenerator::new_initial();
335        assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(1));
336        assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(2));
337    }
338
339    #[test]
340    fn test_col_id_gen_alter() {
341        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
342            columns: vec![
343                ColumnCatalog {
344                    column_desc: ColumnDesc::from_field_with_column_id(
345                        &Field::with_name(DataType::Float32, "f32"),
346                        1,
347                    ),
348                    is_hidden: false,
349                },
350                ColumnCatalog {
351                    column_desc: ColumnDesc::from_field_with_column_id(
352                        &Field::with_name(DataType::Float64, "f64"),
353                        2,
354                    ),
355                    is_hidden: false,
356                },
357                ColumnCatalog {
358                    column_desc: ColumnDesc::from_field_with_column_id(
359                        &Field::with_name(
360                            StructType::new([("f1", DataType::Int32)]).into(),
361                            "nested",
362                        ),
363                        3,
364                    ),
365                    is_hidden: false,
366                },
367            ],
368            version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))),
369            ..Default::default()
370        });
371
372        assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(4));
373        assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(5));
374        assert_eq!(
375            r#gen
376                .generate_simple(Field::new("f32", DataType::Float32))
377                .unwrap(),
378            ColumnId::new(1)
379        );
380
381        // mismatched simple data type
382        let err = r#gen
383            .generate_simple(Field::new("f64", DataType::Float32))
384            .unwrap_err();
385        expect![[r#"column "f64" cannot be altered; only types containing struct can be altered"#]]
386            .assert_eq(&err.to_report_string());
387
388        // mismatched composite data type
389        // we require the nested data type to be exactly the same
390        let err = r#gen
391            .generate_simple(Field::new(
392                "nested",
393                // NOTE: field ids are unset, legacy encoding, thus cannot be altered
394                StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(),
395            ))
396            .unwrap_err();
397        expect![[r#"column "nested" was persisted with legacy encoding thus cannot be altered, consider dropping and readding the column"#]].assert_eq(&err.to_report_string());
398
399        // matched composite data type, should work
400        let id = r#gen
401            .generate_simple(Field::new(
402                "nested",
403                StructType::new([("f1", DataType::Int32)]).into(),
404            ))
405            .unwrap();
406        assert_eq!(id, ColumnId::new(3));
407
408        assert_eq!(r#gen.generate_simple(B("v3")).unwrap(), ColumnId::new(6));
409    }
410
411    #[test]
412    fn test_col_id_gen_alter_composite_type() {
413        let ori_type = || {
414            DataType::from(StructType::new([
415                ("f1", DataType::Int32),
416                (
417                    "map",
418                    MapType::from_kv(
419                        DataType::Varchar,
420                        DataType::List(Box::new(
421                            StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
422                                .into(),
423                        )),
424                    )
425                    .into(),
426                ),
427            ]))
428        };
429
430        let new_type = || {
431            DataType::from(StructType::new([
432                ("f4", DataType::Int32),
433                (
434                    "map",
435                    MapType::from_kv(
436                        DataType::Varchar,
437                        DataType::List(Box::new(
438                            StructType::new([
439                                ("f5", DataType::Int32),
440                                ("f3", DataType::Boolean),
441                                ("f6", DataType::Float32),
442                            ])
443                            .into(),
444                        )),
445                    )
446                    .into(),
447                ),
448            ]))
449        };
450
451        let incompatible_new_type = || {
452            DataType::from(StructType::new([(
453                "map",
454                MapType::from_kv(
455                    DataType::Varchar,
456                    DataType::List(Box::new(
457                        StructType::new([("f6", DataType::Float64)]).into(),
458                    )),
459                )
460                .into(),
461            )]))
462        };
463
464        let mut r#gen = ColumnIdGenerator::new_initial();
465        let mut col = ColumnCatalog {
466            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
467                "nested",
468                ori_type(),
469            )),
470            is_hidden: false,
471        };
472        r#gen.generate(&mut col).unwrap();
473        let version = r#gen.into_version();
474
475        expect![[r#"
476            ColumnCatalog {
477                column_desc: ColumnDesc {
478                    data_type: Struct(
479                        StructType {
480                            fields: [
481                                (
482                                    "f1",
483                                    Int32,
484                                ),
485                                (
486                                    "map",
487                                    Map(
488                                        MapType(
489                                            (
490                                                Varchar,
491                                                List(
492                                                    Struct(
493                                                        StructType {
494                                                            fields: [
495                                                                (
496                                                                    "f2",
497                                                                    Int32,
498                                                                ),
499                                                                (
500                                                                    "f3",
501                                                                    Boolean,
502                                                                ),
503                                                            ],
504                                                            field_ids: [
505                                                                #4,
506                                                                #5,
507                                                            ],
508                                                        },
509                                                    ),
510                                                ),
511                                            ),
512                                        ),
513                                    ),
514                                ),
515                            ],
516                            field_ids: [
517                                #2,
518                                #3,
519                            ],
520                        },
521                    ),
522                    column_id: #1,
523                    name: "nested",
524                    generated_or_default_column: None,
525                    description: None,
526                    additional_column: AdditionalColumn {
527                        column_type: None,
528                    },
529                    version: Pr13707,
530                    system_column: None,
531                    nullable: true,
532                },
533                is_hidden: false,
534            }
535        "#]]
536        .assert_debug_eq(&col);
537
538        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
539            columns: vec![col],
540            version: Some(version),
541            ..Default::default()
542        });
543        let mut new_col = ColumnCatalog {
544            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
545                "nested",
546                new_type(),
547            )),
548            is_hidden: false,
549        };
550        r#gen.generate(&mut new_col).unwrap();
551        let version = r#gen.into_version();
552
553        expect![[r#"
554            ColumnCatalog {
555                column_desc: ColumnDesc {
556                    data_type: Struct(
557                        StructType {
558                            fields: [
559                                (
560                                    "f4",
561                                    Int32,
562                                ),
563                                (
564                                    "map",
565                                    Map(
566                                        MapType(
567                                            (
568                                                Varchar,
569                                                List(
570                                                    Struct(
571                                                        StructType {
572                                                            fields: [
573                                                                (
574                                                                    "f5",
575                                                                    Int32,
576                                                                ),
577                                                                (
578                                                                    "f3",
579                                                                    Boolean,
580                                                                ),
581                                                                (
582                                                                    "f6",
583                                                                    Float32,
584                                                                ),
585                                                            ],
586                                                            field_ids: [
587                                                                #7,
588                                                                #5,
589                                                                #8,
590                                                            ],
591                                                        },
592                                                    ),
593                                                ),
594                                            ),
595                                        ),
596                                    ),
597                                ),
598                            ],
599                            field_ids: [
600                                #6,
601                                #3,
602                            ],
603                        },
604                    ),
605                    column_id: #1,
606                    name: "nested",
607                    generated_or_default_column: None,
608                    description: None,
609                    additional_column: AdditionalColumn {
610                        column_type: None,
611                    },
612                    version: Pr13707,
613                    system_column: None,
614                    nullable: true,
615                },
616                is_hidden: false,
617            }
618        "#]]
619        .assert_debug_eq(&new_col);
620
621        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
622            columns: vec![new_col],
623            version: Some(version),
624            ..Default::default()
625        });
626
627        let mut new_col = ColumnCatalog {
628            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
629                "nested",
630                incompatible_new_type(),
631            )),
632            is_hidden: false,
633        };
634        let err = r#gen.generate(&mut new_col).unwrap_err();
635        expect![[r#"incompatible data type change from Float32 to Float64 at path "nested.map.value.element.f6""#]].assert_eq(&err.to_report_string());
636    }
637}