risingwave_frontend/handler/create_table/
col_id_gen.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, INITIAL_TABLE_VERSION_ID, TableVersionId};
20use risingwave_common::types::{DataType, MapType, StructType, data_types};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use crate::TableCatalog;
24use crate::catalog::ColumnId;
25use crate::catalog::table_catalog::TableVersion;
26use crate::error::Result;
27
28/// A segment in a [`Path`].
29#[derive(Debug, Clone, Hash, PartialEq, Eq)]
30enum Segment {
31    Field(String), // for both top-level columns and struct fields
32    ListElement,
33    MapKey,
34    MapValue,
35}
36
37impl std::fmt::Display for Segment {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Segment::Field(name) => write!(f, "{}", name),
41            Segment::ListElement => write!(f, "element"),
42            Segment::MapKey => write!(f, "key"),
43            Segment::MapValue => write!(f, "value"),
44        }
45    }
46}
47
48/// The path to a nested field in a column with composite data type.
49type Path = Vec<Segment>;
50
51type Existing = HashMap<Path, (ColumnId, DataType)>;
52
53/// Column ID generator for a new table or a new version of an existing table to alter.
54#[derive(Debug)]
55pub struct ColumnIdGenerator {
56    /// Existing fields and their IDs and data types.
57    ///
58    /// This is used for aligning field IDs between versions (`ALTER`s). If a field already
59    /// exists and the data type matches, its ID is reused. Otherwise, a new ID is generated.
60    ///
61    /// For a new table, this is empty.
62    existing: Existing,
63
64    /// The next column ID to generate, used for new columns that do not exist in `existing`.
65    next_column_id: ColumnId,
66
67    /// The version ID of the table to be created or altered.
68    ///
69    /// For a new table, this is 0. For altering an existing table, this is the **next** version ID
70    /// of the `version_id` field in the original table catalog.
71    version_id: TableVersionId,
72}
73
74impl ColumnIdGenerator {
75    /// Creates a new [`ColumnIdGenerator`] for altering an existing table.
76    pub fn new_alter(original: &TableCatalog) -> Self {
77        fn handle(existing: &mut Existing, path: &mut Path, id: ColumnId, data_type: DataType) {
78            macro_rules! with_segment {
79                ($segment:expr, $block:block) => {
80                    path.push($segment);
81                    $block
82                    path.pop();
83                };
84            }
85
86            match &data_type {
87                DataType::Struct(fields) => {
88                    for ((field_name, field_data_type), field_id) in
89                        fields.iter().zip_eq_fast(fields.ids_or_placeholder())
90                    {
91                        with_segment!(Segment::Field(field_name.to_owned()), {
92                            handle(existing, path, field_id, field_data_type.clone());
93                        });
94                    }
95                }
96                DataType::List(inner) => {
97                    // There's no id for the element as list's own structure won't change.
98                    with_segment!(Segment::ListElement, {
99                        handle(existing, path, ColumnId::placeholder(), *inner.clone());
100                    });
101                }
102                DataType::Map(map) => {
103                    // There's no id for the key/value as map's own structure won't change.
104                    with_segment!(Segment::MapKey, {
105                        handle(existing, path, ColumnId::placeholder(), map.key().clone());
106                    });
107                    with_segment!(Segment::MapValue, {
108                        handle(existing, path, ColumnId::placeholder(), map.value().clone());
109                    });
110                }
111
112                DataType::Vector(_) => {}
113                data_types::simple!() => {}
114            }
115
116            existing
117                .try_insert(path.clone(), (id, data_type))
118                .unwrap_or_else(|_| panic!("duplicate path: {:?}", path));
119        }
120
121        let mut existing = Existing::new();
122
123        // Collect all existing fields into `existing`.
124        for col in original.columns() {
125            let mut path = vec![Segment::Field(col.name().to_owned())];
126            handle(
127                &mut existing,
128                &mut path,
129                col.column_id(),
130                col.data_type().clone(),
131            );
132        }
133
134        let version = original.version().expect("version field not set");
135
136        Self {
137            existing,
138            next_column_id: version.next_column_id,
139            version_id: version.version_id + 1,
140        }
141    }
142
143    /// Creates a new [`ColumnIdGenerator`] for a new table.
144    pub fn new_initial() -> Self {
145        Self {
146            existing: Existing::new(),
147            next_column_id: ColumnId::first_user_column(),
148            version_id: INITIAL_TABLE_VERSION_ID,
149        }
150    }
151
152    /// Generate [`ColumnId`]s for the given column and its nested fields (if any) recursively.
153    /// The IDs of nested fields will be reflected in the updated [`DataType`] of the column.
154    ///
155    /// Returns an error if there's incompatible data type change.
156    pub fn generate(&mut self, col: &mut ColumnCatalog) -> Result<()> {
157        let mut path = vec![Segment::Field(col.name().to_owned())];
158
159        if let Some((original_column_id, original_data_type)) = self.existing.get(&path) {
160            if original_data_type == col.data_type() {
161                col.column_desc.column_id = *original_column_id;
162                // Equality above ignores nested field IDs. We need to clone them below.
163                col.column_desc.data_type = original_data_type.clone();
164                return Ok(());
165            } else {
166                // Check if the column can be altered.
167                match original_data_type.can_alter() {
168                    Some(true) => { /* pass */ }
169                    Some(false) => bail!(
170                        "column \"{}\" was persisted with legacy encoding thus cannot be altered, \
171                         consider dropping and readding the column",
172                        col.name()
173                    ),
174                    None => bail!(
175                        "column \"{}\" cannot be altered; only types containing struct can be altered",
176                        col.name()
177                    ),
178                }
179            }
180        }
181
182        fn handle(
183            this: &mut ColumnIdGenerator,
184            path: &mut Path,
185            data_type: DataType,
186        ) -> Result<(ColumnId, DataType)> {
187            macro_rules! with_segment {
188                ($segment:expr, $block:block) => {{
189                    path.push($segment);
190                    let ret = $block;
191                    path.pop();
192                    ret
193                }};
194            }
195
196            let original_column_id = match this.existing.get(&*path) {
197                Some((original_column_id, original_data_type)) => {
198                    // Only check the type name (discriminant) for compatibility check here.
199                    // For nested fields, we will check them recursively later.
200                    let incompatible = original_data_type.type_name() != data_type.type_name()
201                        || matches!(
202                            (original_data_type, &data_type),
203                            (DataType::Vector(old), DataType::Vector(new)) if old != new,
204                        );
205                    if incompatible {
206                        let path = path.iter().join(".");
207                        bail!(
208                            "incompatible data type change from {:?} to {:?} at path \"{}\"",
209                            original_data_type.type_name(),
210                            data_type.type_name(),
211                            path
212                        );
213                    }
214                    Some(*original_column_id)
215                }
216                None => None,
217            };
218
219            // Only top-level column and struct fields need an ID.
220            let need_gen_id = matches!(path.last().unwrap(), Segment::Field(_));
221
222            let new_id = if need_gen_id {
223                if let Some(id) = original_column_id {
224                    assert!(
225                        id != ColumnId::placeholder(),
226                        "original column id should not be placeholder"
227                    );
228                    id
229                } else {
230                    let id = this.next_column_id;
231                    this.next_column_id = this.next_column_id.next();
232                    id
233                }
234            } else {
235                // Column ID is actually unused by caller (for list and map types), just use a placeholder.
236                ColumnId::placeholder()
237            };
238
239            let new_data_type = match data_type {
240                DataType::Struct(fields) => {
241                    let mut new_fields = Vec::new();
242                    let mut ids = Vec::new();
243                    for (field_name, field_data_type) in fields.iter() {
244                        let (id, new_field_data_type) =
245                            with_segment!(Segment::Field(field_name.to_owned()), {
246                                handle(this, path, field_data_type.clone())?
247                            });
248                        new_fields.push((field_name.to_owned(), new_field_data_type));
249                        ids.push(id);
250                    }
251                    DataType::Struct(StructType::new(new_fields).with_ids(ids))
252                }
253                DataType::List(inner) => {
254                    let (_, new_inner) =
255                        with_segment!(Segment::ListElement, { handle(this, path, *inner)? });
256                    DataType::List(Box::new(new_inner))
257                }
258                DataType::Map(map) => {
259                    let (_, new_key) =
260                        with_segment!(Segment::MapKey, { handle(this, path, map.key().clone())? });
261                    let (_, new_value) = with_segment!(Segment::MapValue, {
262                        handle(this, path, map.value().clone())?
263                    });
264                    DataType::Map(MapType::from_kv(new_key, new_value))
265                }
266
267                DataType::Vector(_) => data_type,
268                data_types::simple!() => data_type,
269            };
270
271            Ok((new_id, new_data_type))
272        }
273
274        let (new_column_id, new_data_type) = handle(self, &mut path, col.data_type().clone())?;
275
276        col.column_desc.column_id = new_column_id;
277        col.column_desc.data_type = new_data_type;
278
279        Ok(())
280    }
281
282    /// Consume this generator and return a [`TableVersion`] for the table to be created or altered.
283    pub fn into_version(self) -> TableVersion {
284        TableVersion {
285            version_id: self.version_id,
286            next_column_id: self.next_column_id,
287        }
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use expect_test::expect;
294    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, FieldLike};
295    use risingwave_common::types::StructType;
296    use thiserror_ext::AsReport;
297
298    use super::*;
299
300    struct BrandNewColumn(&'static str);
301    use BrandNewColumn as B;
302
303    impl FieldLike for BrandNewColumn {
304        fn name(&self) -> &str {
305            self.0
306        }
307
308        fn data_type(&self) -> &DataType {
309            &DataType::Boolean
310        }
311    }
312
313    #[easy_ext::ext(ColumnIdGeneratorTestExt)]
314    impl ColumnIdGenerator {
315        /// Generate a column ID for the given field.
316        ///
317        /// This helper function checks that the data type of the field has not changed after
318        /// generating the column ID, i.e., it is called on simple types or composite types
319        /// with `field_ids` unset and legacy encoding.
320        fn generate_simple(&mut self, field: impl FieldLike) -> Result<ColumnId> {
321            let original_data_type = field.data_type().clone();
322
323            let field = Field::new(field.name(), original_data_type.clone());
324            let mut col = ColumnCatalog {
325                column_desc: ColumnDesc::from_field_without_column_id(&field),
326                is_hidden: false,
327            };
328            self.generate(&mut col)?;
329
330            assert_eq!(
331                col.column_desc.data_type, original_data_type,
332                "data type has changed after generating column id, \
333                 are you calling this on a composite type?"
334            );
335
336            Ok(col.column_desc.column_id)
337        }
338    }
339
340    #[test]
341    fn test_col_id_gen_initial() {
342        let mut r#gen = ColumnIdGenerator::new_initial();
343        assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(1));
344        assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(2));
345    }
346
347    #[test]
348    fn test_col_id_gen_alter() {
349        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
350            columns: vec![
351                ColumnCatalog {
352                    column_desc: ColumnDesc::from_field_with_column_id(
353                        &Field::with_name(DataType::Float32, "f32"),
354                        1,
355                    ),
356                    is_hidden: false,
357                },
358                ColumnCatalog {
359                    column_desc: ColumnDesc::from_field_with_column_id(
360                        &Field::with_name(DataType::Float64, "f64"),
361                        2,
362                    ),
363                    is_hidden: false,
364                },
365                ColumnCatalog {
366                    column_desc: ColumnDesc::from_field_with_column_id(
367                        &Field::with_name(
368                            StructType::new([("f1", DataType::Int32)]).into(),
369                            "nested",
370                        ),
371                        3,
372                    ),
373                    is_hidden: false,
374                },
375            ],
376            version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))),
377            ..Default::default()
378        });
379
380        assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(4));
381        assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(5));
382        assert_eq!(
383            r#gen
384                .generate_simple(Field::new("f32", DataType::Float32))
385                .unwrap(),
386            ColumnId::new(1)
387        );
388
389        // mismatched simple data type
390        let err = r#gen
391            .generate_simple(Field::new("f64", DataType::Float32))
392            .unwrap_err();
393        expect![[r#"column "f64" cannot be altered; only types containing struct can be altered"#]]
394            .assert_eq(&err.to_report_string());
395
396        // mismatched composite data type
397        // we require the nested data type to be exactly the same
398        let err = r#gen
399            .generate_simple(Field::new(
400                "nested",
401                // NOTE: field ids are unset, legacy encoding, thus cannot be altered
402                StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(),
403            ))
404            .unwrap_err();
405        expect![[r#"column "nested" was persisted with legacy encoding thus cannot be altered, consider dropping and readding the column"#]].assert_eq(&err.to_report_string());
406
407        // matched composite data type, should work
408        let id = r#gen
409            .generate_simple(Field::new(
410                "nested",
411                StructType::new([("f1", DataType::Int32)]).into(),
412            ))
413            .unwrap();
414        assert_eq!(id, ColumnId::new(3));
415
416        assert_eq!(r#gen.generate_simple(B("v3")).unwrap(), ColumnId::new(6));
417    }
418
419    #[test]
420    fn test_col_id_gen_alter_composite_type() {
421        let ori_type = || {
422            DataType::from(StructType::new([
423                ("f1", DataType::Int32),
424                (
425                    "map",
426                    MapType::from_kv(
427                        DataType::Varchar,
428                        DataType::List(Box::new(
429                            StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
430                                .into(),
431                        )),
432                    )
433                    .into(),
434                ),
435            ]))
436        };
437
438        let new_type = || {
439            DataType::from(StructType::new([
440                ("f4", DataType::Int32),
441                (
442                    "map",
443                    MapType::from_kv(
444                        DataType::Varchar,
445                        DataType::List(Box::new(
446                            StructType::new([
447                                ("f5", DataType::Int32),
448                                ("f3", DataType::Boolean),
449                                ("f6", DataType::Float32),
450                            ])
451                            .into(),
452                        )),
453                    )
454                    .into(),
455                ),
456            ]))
457        };
458
459        let incompatible_new_type = || {
460            DataType::from(StructType::new([(
461                "map",
462                MapType::from_kv(
463                    DataType::Varchar,
464                    DataType::List(Box::new(
465                        StructType::new([("f6", DataType::Float64)]).into(),
466                    )),
467                )
468                .into(),
469            )]))
470        };
471
472        let mut r#gen = ColumnIdGenerator::new_initial();
473        let mut col = ColumnCatalog {
474            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
475                "nested",
476                ori_type(),
477            )),
478            is_hidden: false,
479        };
480        r#gen.generate(&mut col).unwrap();
481        let version = r#gen.into_version();
482
483        expect![[r#"
484            ColumnCatalog {
485                column_desc: ColumnDesc {
486                    data_type: Struct(
487                        StructType {
488                            fields: [
489                                (
490                                    "f1",
491                                    Int32,
492                                ),
493                                (
494                                    "map",
495                                    Map(
496                                        MapType(
497                                            (
498                                                Varchar,
499                                                List(
500                                                    Struct(
501                                                        StructType {
502                                                            fields: [
503                                                                (
504                                                                    "f2",
505                                                                    Int32,
506                                                                ),
507                                                                (
508                                                                    "f3",
509                                                                    Boolean,
510                                                                ),
511                                                            ],
512                                                            field_ids: [
513                                                                #4,
514                                                                #5,
515                                                            ],
516                                                        },
517                                                    ),
518                                                ),
519                                            ),
520                                        ),
521                                    ),
522                                ),
523                            ],
524                            field_ids: [
525                                #2,
526                                #3,
527                            ],
528                        },
529                    ),
530                    column_id: #1,
531                    name: "nested",
532                    generated_or_default_column: None,
533                    description: None,
534                    additional_column: AdditionalColumn {
535                        column_type: None,
536                    },
537                    version: Pr13707,
538                    system_column: None,
539                    nullable: true,
540                },
541                is_hidden: false,
542            }
543        "#]]
544        .assert_debug_eq(&col);
545
546        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
547            columns: vec![col],
548            version: Some(version),
549            ..Default::default()
550        });
551        let mut new_col = ColumnCatalog {
552            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
553                "nested",
554                new_type(),
555            )),
556            is_hidden: false,
557        };
558        r#gen.generate(&mut new_col).unwrap();
559        let version = r#gen.into_version();
560
561        expect![[r#"
562            ColumnCatalog {
563                column_desc: ColumnDesc {
564                    data_type: Struct(
565                        StructType {
566                            fields: [
567                                (
568                                    "f4",
569                                    Int32,
570                                ),
571                                (
572                                    "map",
573                                    Map(
574                                        MapType(
575                                            (
576                                                Varchar,
577                                                List(
578                                                    Struct(
579                                                        StructType {
580                                                            fields: [
581                                                                (
582                                                                    "f5",
583                                                                    Int32,
584                                                                ),
585                                                                (
586                                                                    "f3",
587                                                                    Boolean,
588                                                                ),
589                                                                (
590                                                                    "f6",
591                                                                    Float32,
592                                                                ),
593                                                            ],
594                                                            field_ids: [
595                                                                #7,
596                                                                #5,
597                                                                #8,
598                                                            ],
599                                                        },
600                                                    ),
601                                                ),
602                                            ),
603                                        ),
604                                    ),
605                                ),
606                            ],
607                            field_ids: [
608                                #6,
609                                #3,
610                            ],
611                        },
612                    ),
613                    column_id: #1,
614                    name: "nested",
615                    generated_or_default_column: None,
616                    description: None,
617                    additional_column: AdditionalColumn {
618                        column_type: None,
619                    },
620                    version: Pr13707,
621                    system_column: None,
622                    nullable: true,
623                },
624                is_hidden: false,
625            }
626        "#]]
627        .assert_debug_eq(&new_col);
628
629        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
630            columns: vec![new_col],
631            version: Some(version),
632            ..Default::default()
633        });
634
635        let mut new_col = ColumnCatalog {
636            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
637                "nested",
638                incompatible_new_type(),
639            )),
640            is_hidden: false,
641        };
642        let err = r#gen.generate(&mut new_col).unwrap_err();
643        expect![[r#"incompatible data type change from Float32 to Float64 at path "nested.map.value.element.f6""#]].assert_eq(&err.to_report_string());
644    }
645}