risingwave_frontend/handler/create_table/
col_id_gen.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, INITIAL_TABLE_VERSION_ID, TableVersionId};
20use risingwave_common::types::{DataType, MapType, StructType, data_types};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use crate::TableCatalog;
24use crate::catalog::ColumnId;
25use crate::catalog::table_catalog::TableVersion;
26use crate::error::Result;
27
28/// A segment in a [`Path`].
29#[derive(Debug, Clone, Hash, PartialEq, Eq)]
30enum Segment {
31    Field(String), // for both top-level columns and struct fields
32    ListElement,
33    MapKey,
34    MapValue,
35}
36
37impl std::fmt::Display for Segment {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Segment::Field(name) => write!(f, "{}", name),
41            Segment::ListElement => write!(f, "element"),
42            Segment::MapKey => write!(f, "key"),
43            Segment::MapValue => write!(f, "value"),
44        }
45    }
46}
47
48/// The path to a nested field in a column with composite data type.
49type Path = Vec<Segment>;
50
51type Existing = HashMap<Path, (ColumnId, DataType)>;
52
53/// Column ID generator for a new table or a new version of an existing table to alter.
54#[derive(Debug)]
55pub struct ColumnIdGenerator {
56    /// Existing fields and their IDs and data types.
57    ///
58    /// This is used for aligning field IDs between versions (`ALTER`s). If a field already
59    /// exists and the data type matches, its ID is reused. Otherwise, a new ID is generated.
60    ///
61    /// For a new table, this is empty.
62    existing: Existing,
63
64    /// The next column ID to generate, used for new columns that do not exist in `existing`.
65    next_column_id: ColumnId,
66
67    /// The version ID of the table to be created or altered.
68    ///
69    /// For a new table, this is 0. For altering an existing table, this is the **next** version ID
70    /// of the `version_id` field in the original table catalog.
71    version_id: TableVersionId,
72}
73
74impl ColumnIdGenerator {
75    /// Creates a new [`ColumnIdGenerator`] for altering an existing table.
76    pub fn new_alter(original: &TableCatalog) -> Self {
77        fn handle(existing: &mut Existing, path: &mut Path, id: ColumnId, data_type: DataType) {
78            macro_rules! with_segment {
79                ($segment:expr, $block:block) => {
80                    path.push($segment);
81                    $block
82                    path.pop();
83                };
84            }
85
86            match &data_type {
87                DataType::Struct(fields) => {
88                    for ((field_name, field_data_type), field_id) in
89                        fields.iter().zip_eq_fast(fields.ids_or_placeholder())
90                    {
91                        with_segment!(Segment::Field(field_name.to_owned()), {
92                            handle(existing, path, field_id, field_data_type.clone());
93                        });
94                    }
95                }
96                DataType::List(inner) => {
97                    // There's no id for the element as list's own structure won't change.
98                    with_segment!(Segment::ListElement, {
99                        handle(existing, path, ColumnId::placeholder(), *inner.clone());
100                    });
101                }
102                DataType::Map(map) => {
103                    // There's no id for the key/value as map's own structure won't change.
104                    with_segment!(Segment::MapKey, {
105                        handle(existing, path, ColumnId::placeholder(), map.key().clone());
106                    });
107                    with_segment!(Segment::MapValue, {
108                        handle(existing, path, ColumnId::placeholder(), map.value().clone());
109                    });
110                }
111
112                DataType::Vector(_) => {}
113                data_types::simple!() => {}
114            }
115
116            existing
117                .try_insert(path.clone(), (id, data_type))
118                .unwrap_or_else(|_| panic!("duplicate path: {:?}", path));
119        }
120
121        let mut existing = Existing::new();
122
123        // Collect all existing fields into `existing`.
124        for col in original.columns() {
125            let mut path = vec![Segment::Field(col.name().to_owned())];
126            handle(
127                &mut existing,
128                &mut path,
129                col.column_id(),
130                col.data_type().clone(),
131            );
132        }
133
134        let version = original.version().expect("version field not set");
135
136        Self {
137            existing,
138            next_column_id: version.next_column_id,
139            version_id: version.version_id + 1,
140        }
141    }
142
143    /// Creates a new [`ColumnIdGenerator`] for a new table.
144    pub fn new_initial() -> Self {
145        Self {
146            existing: Existing::new(),
147            next_column_id: ColumnId::first_user_column(),
148            version_id: INITIAL_TABLE_VERSION_ID,
149        }
150    }
151
152    /// Generate [`ColumnId`]s for the given column and its nested fields (if any) recursively.
153    /// The IDs of nested fields will be reflected in the updated [`DataType`] of the column.
154    ///
155    /// Returns an error if there's incompatible data type change.
156    pub fn generate(&mut self, col: &mut ColumnCatalog) -> Result<()> {
157        let mut path = vec![Segment::Field(col.name().to_owned())];
158
159        if let Some((original_column_id, original_data_type)) = self.existing.get(&path) {
160            if original_data_type == col.data_type() {
161                col.column_desc.column_id = *original_column_id;
162                // Equality above ignores nested field IDs. We need to clone them below.
163                col.column_desc.data_type = original_data_type.clone();
164                return Ok(());
165            } else {
166                // Check if the column can be altered.
167                match original_data_type.can_alter() {
168                    Some(true) => { /* pass */ }
169                    Some(false) => bail!(
170                        "column \"{}\" was persisted with legacy encoding thus cannot be altered, \
171                         consider dropping and readding the column",
172                        col.name()
173                    ),
174                    None => bail!(
175                        "column \"{}\" cannot be altered; only types containing struct can be altered",
176                        col.name()
177                    ),
178                }
179            }
180        }
181
182        fn handle(
183            this: &mut ColumnIdGenerator,
184            path: &mut Path,
185            data_type: DataType,
186        ) -> Result<(ColumnId, DataType)> {
187            macro_rules! with_segment {
188                ($segment:expr, $block:block) => {{
189                    path.push($segment);
190                    let ret = $block;
191                    path.pop();
192                    ret
193                }};
194            }
195
196            let original_column_id = match this.existing.get(&*path) {
197                Some((original_column_id, original_data_type)) => {
198                    // Only check the type name (discriminant) for compatibility check here.
199                    // For nested fields, we will check them recursively later.
200                    if original_data_type.type_name() != data_type.type_name() {
201                        let path = path.iter().join(".");
202                        bail!(
203                            "incompatible data type change from {:?} to {:?} at path \"{}\"",
204                            original_data_type.type_name(),
205                            data_type.type_name(),
206                            path
207                        );
208                    }
209                    Some(*original_column_id)
210                }
211                None => None,
212            };
213
214            // Only top-level column and struct fields need an ID.
215            let need_gen_id = matches!(path.last().unwrap(), Segment::Field(_));
216
217            let new_id = if need_gen_id {
218                if let Some(id) = original_column_id {
219                    assert!(
220                        id != ColumnId::placeholder(),
221                        "original column id should not be placeholder"
222                    );
223                    id
224                } else {
225                    let id = this.next_column_id;
226                    this.next_column_id = this.next_column_id.next();
227                    id
228                }
229            } else {
230                // Column ID is actually unused by caller (for list and map types), just use a placeholder.
231                ColumnId::placeholder()
232            };
233
234            let new_data_type = match data_type {
235                DataType::Struct(fields) => {
236                    let mut new_fields = Vec::new();
237                    let mut ids = Vec::new();
238                    for (field_name, field_data_type) in fields.iter() {
239                        let (id, new_field_data_type) =
240                            with_segment!(Segment::Field(field_name.to_owned()), {
241                                handle(this, path, field_data_type.clone())?
242                            });
243                        new_fields.push((field_name.to_owned(), new_field_data_type));
244                        ids.push(id);
245                    }
246                    DataType::Struct(StructType::new(new_fields).with_ids(ids))
247                }
248                DataType::List(inner) => {
249                    let (_, new_inner) =
250                        with_segment!(Segment::ListElement, { handle(this, path, *inner)? });
251                    DataType::List(Box::new(new_inner))
252                }
253                DataType::Map(map) => {
254                    let (_, new_key) =
255                        with_segment!(Segment::MapKey, { handle(this, path, map.key().clone())? });
256                    let (_, new_value) = with_segment!(Segment::MapValue, {
257                        handle(this, path, map.value().clone())?
258                    });
259                    DataType::Map(MapType::from_kv(new_key, new_value))
260                }
261
262                DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
263                data_types::simple!() => data_type,
264            };
265
266            Ok((new_id, new_data_type))
267        }
268
269        let (new_column_id, new_data_type) = handle(self, &mut path, col.data_type().clone())?;
270
271        col.column_desc.column_id = new_column_id;
272        col.column_desc.data_type = new_data_type;
273
274        Ok(())
275    }
276
277    /// Consume this generator and return a [`TableVersion`] for the table to be created or altered.
278    pub fn into_version(self) -> TableVersion {
279        TableVersion {
280            version_id: self.version_id,
281            next_column_id: self.next_column_id,
282        }
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use expect_test::expect;
289    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, FieldLike};
290    use risingwave_common::types::StructType;
291    use thiserror_ext::AsReport;
292
293    use super::*;
294
295    struct BrandNewColumn(&'static str);
296    use BrandNewColumn as B;
297
298    impl FieldLike for BrandNewColumn {
299        fn name(&self) -> &str {
300            self.0
301        }
302
303        fn data_type(&self) -> &DataType {
304            &DataType::Boolean
305        }
306    }
307
308    #[easy_ext::ext(ColumnIdGeneratorTestExt)]
309    impl ColumnIdGenerator {
310        /// Generate a column ID for the given field.
311        ///
312        /// This helper function checks that the data type of the field has not changed after
313        /// generating the column ID, i.e., it is called on simple types or composite types
314        /// with `field_ids` unset and legacy encoding.
315        fn generate_simple(&mut self, field: impl FieldLike) -> Result<ColumnId> {
316            let original_data_type = field.data_type().clone();
317
318            let field = Field::new(field.name(), original_data_type.clone());
319            let mut col = ColumnCatalog {
320                column_desc: ColumnDesc::from_field_without_column_id(&field),
321                is_hidden: false,
322            };
323            self.generate(&mut col)?;
324
325            assert_eq!(
326                col.column_desc.data_type, original_data_type,
327                "data type has changed after generating column id, \
328                 are you calling this on a composite type?"
329            );
330
331            Ok(col.column_desc.column_id)
332        }
333    }
334
335    #[test]
336    fn test_col_id_gen_initial() {
337        let mut r#gen = ColumnIdGenerator::new_initial();
338        assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(1));
339        assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(2));
340    }
341
342    #[test]
343    fn test_col_id_gen_alter() {
344        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
345            columns: vec![
346                ColumnCatalog {
347                    column_desc: ColumnDesc::from_field_with_column_id(
348                        &Field::with_name(DataType::Float32, "f32"),
349                        1,
350                    ),
351                    is_hidden: false,
352                },
353                ColumnCatalog {
354                    column_desc: ColumnDesc::from_field_with_column_id(
355                        &Field::with_name(DataType::Float64, "f64"),
356                        2,
357                    ),
358                    is_hidden: false,
359                },
360                ColumnCatalog {
361                    column_desc: ColumnDesc::from_field_with_column_id(
362                        &Field::with_name(
363                            StructType::new([("f1", DataType::Int32)]).into(),
364                            "nested",
365                        ),
366                        3,
367                    ),
368                    is_hidden: false,
369                },
370            ],
371            version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))),
372            ..Default::default()
373        });
374
375        assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(4));
376        assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(5));
377        assert_eq!(
378            r#gen
379                .generate_simple(Field::new("f32", DataType::Float32))
380                .unwrap(),
381            ColumnId::new(1)
382        );
383
384        // mismatched simple data type
385        let err = r#gen
386            .generate_simple(Field::new("f64", DataType::Float32))
387            .unwrap_err();
388        expect![[r#"column "f64" cannot be altered; only types containing struct can be altered"#]]
389            .assert_eq(&err.to_report_string());
390
391        // mismatched composite data type
392        // we require the nested data type to be exactly the same
393        let err = r#gen
394            .generate_simple(Field::new(
395                "nested",
396                // NOTE: field ids are unset, legacy encoding, thus cannot be altered
397                StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(),
398            ))
399            .unwrap_err();
400        expect![[r#"column "nested" was persisted with legacy encoding thus cannot be altered, consider dropping and readding the column"#]].assert_eq(&err.to_report_string());
401
402        // matched composite data type, should work
403        let id = r#gen
404            .generate_simple(Field::new(
405                "nested",
406                StructType::new([("f1", DataType::Int32)]).into(),
407            ))
408            .unwrap();
409        assert_eq!(id, ColumnId::new(3));
410
411        assert_eq!(r#gen.generate_simple(B("v3")).unwrap(), ColumnId::new(6));
412    }
413
414    #[test]
415    fn test_col_id_gen_alter_composite_type() {
416        let ori_type = || {
417            DataType::from(StructType::new([
418                ("f1", DataType::Int32),
419                (
420                    "map",
421                    MapType::from_kv(
422                        DataType::Varchar,
423                        DataType::List(Box::new(
424                            StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
425                                .into(),
426                        )),
427                    )
428                    .into(),
429                ),
430            ]))
431        };
432
433        let new_type = || {
434            DataType::from(StructType::new([
435                ("f4", DataType::Int32),
436                (
437                    "map",
438                    MapType::from_kv(
439                        DataType::Varchar,
440                        DataType::List(Box::new(
441                            StructType::new([
442                                ("f5", DataType::Int32),
443                                ("f3", DataType::Boolean),
444                                ("f6", DataType::Float32),
445                            ])
446                            .into(),
447                        )),
448                    )
449                    .into(),
450                ),
451            ]))
452        };
453
454        let incompatible_new_type = || {
455            DataType::from(StructType::new([(
456                "map",
457                MapType::from_kv(
458                    DataType::Varchar,
459                    DataType::List(Box::new(
460                        StructType::new([("f6", DataType::Float64)]).into(),
461                    )),
462                )
463                .into(),
464            )]))
465        };
466
467        let mut r#gen = ColumnIdGenerator::new_initial();
468        let mut col = ColumnCatalog {
469            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
470                "nested",
471                ori_type(),
472            )),
473            is_hidden: false,
474        };
475        r#gen.generate(&mut col).unwrap();
476        let version = r#gen.into_version();
477
478        expect![[r#"
479            ColumnCatalog {
480                column_desc: ColumnDesc {
481                    data_type: Struct(
482                        StructType {
483                            fields: [
484                                (
485                                    "f1",
486                                    Int32,
487                                ),
488                                (
489                                    "map",
490                                    Map(
491                                        MapType(
492                                            (
493                                                Varchar,
494                                                List(
495                                                    Struct(
496                                                        StructType {
497                                                            fields: [
498                                                                (
499                                                                    "f2",
500                                                                    Int32,
501                                                                ),
502                                                                (
503                                                                    "f3",
504                                                                    Boolean,
505                                                                ),
506                                                            ],
507                                                            field_ids: [
508                                                                #4,
509                                                                #5,
510                                                            ],
511                                                        },
512                                                    ),
513                                                ),
514                                            ),
515                                        ),
516                                    ),
517                                ),
518                            ],
519                            field_ids: [
520                                #2,
521                                #3,
522                            ],
523                        },
524                    ),
525                    column_id: #1,
526                    name: "nested",
527                    generated_or_default_column: None,
528                    description: None,
529                    additional_column: AdditionalColumn {
530                        column_type: None,
531                    },
532                    version: Pr13707,
533                    system_column: None,
534                    nullable: true,
535                },
536                is_hidden: false,
537            }
538        "#]]
539        .assert_debug_eq(&col);
540
541        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
542            columns: vec![col],
543            version: Some(version),
544            ..Default::default()
545        });
546        let mut new_col = ColumnCatalog {
547            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
548                "nested",
549                new_type(),
550            )),
551            is_hidden: false,
552        };
553        r#gen.generate(&mut new_col).unwrap();
554        let version = r#gen.into_version();
555
556        expect![[r#"
557            ColumnCatalog {
558                column_desc: ColumnDesc {
559                    data_type: Struct(
560                        StructType {
561                            fields: [
562                                (
563                                    "f4",
564                                    Int32,
565                                ),
566                                (
567                                    "map",
568                                    Map(
569                                        MapType(
570                                            (
571                                                Varchar,
572                                                List(
573                                                    Struct(
574                                                        StructType {
575                                                            fields: [
576                                                                (
577                                                                    "f5",
578                                                                    Int32,
579                                                                ),
580                                                                (
581                                                                    "f3",
582                                                                    Boolean,
583                                                                ),
584                                                                (
585                                                                    "f6",
586                                                                    Float32,
587                                                                ),
588                                                            ],
589                                                            field_ids: [
590                                                                #7,
591                                                                #5,
592                                                                #8,
593                                                            ],
594                                                        },
595                                                    ),
596                                                ),
597                                            ),
598                                        ),
599                                    ),
600                                ),
601                            ],
602                            field_ids: [
603                                #6,
604                                #3,
605                            ],
606                        },
607                    ),
608                    column_id: #1,
609                    name: "nested",
610                    generated_or_default_column: None,
611                    description: None,
612                    additional_column: AdditionalColumn {
613                        column_type: None,
614                    },
615                    version: Pr13707,
616                    system_column: None,
617                    nullable: true,
618                },
619                is_hidden: false,
620            }
621        "#]]
622        .assert_debug_eq(&new_col);
623
624        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
625            columns: vec![new_col],
626            version: Some(version),
627            ..Default::default()
628        });
629
630        let mut new_col = ColumnCatalog {
631            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
632                "nested",
633                incompatible_new_type(),
634            )),
635            is_hidden: false,
636        };
637        let err = r#gen.generate(&mut new_col).unwrap_err();
638        expect![[r#"incompatible data type change from Float32 to Float64 at path "nested.map.value.element.f6""#]].assert_eq(&err.to_report_string());
639    }
640}