risingwave_frontend/handler/create_table/
col_id_gen.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, INITIAL_TABLE_VERSION_ID, TableVersionId};
20use risingwave_common::types::{DataType, MapType, StructType, data_types};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use crate::TableCatalog;
24use crate::catalog::ColumnId;
25use crate::catalog::table_catalog::TableVersion;
26use crate::error::Result;
27
28/// A segment in a [`Path`].
29#[derive(Debug, Clone, Hash, PartialEq, Eq)]
30enum Segment {
31    Field(String), // for both top-level columns and struct fields
32    ListElement,
33    MapKey,
34    MapValue,
35}
36
37impl std::fmt::Display for Segment {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Segment::Field(name) => write!(f, "{}", name),
41            Segment::ListElement => write!(f, "element"),
42            Segment::MapKey => write!(f, "key"),
43            Segment::MapValue => write!(f, "value"),
44        }
45    }
46}
47
48/// The path to a nested field in a column with composite data type.
49type Path = Vec<Segment>;
50
51type Existing = HashMap<Path, (ColumnId, DataType)>;
52
53/// Column ID generator for a new table or a new version of an existing table to alter.
54#[derive(Debug)]
55pub struct ColumnIdGenerator {
56    /// Existing fields and their IDs and data types.
57    ///
58    /// This is used for aligning field IDs between versions (`ALTER`s). If a field already
59    /// exists and the data type matches, its ID is reused. Otherwise, a new ID is generated.
60    ///
61    /// For a new table, this is empty.
62    existing: Existing,
63
64    /// The next column ID to generate, used for new columns that do not exist in `existing`.
65    next_column_id: ColumnId,
66
67    /// The version ID of the table to be created or altered.
68    ///
69    /// For a new table, this is 0. For altering an existing table, this is the **next** version ID
70    /// of the `version_id` field in the original table catalog.
71    version_id: TableVersionId,
72}
73
74impl ColumnIdGenerator {
75    /// Creates a new [`ColumnIdGenerator`] for altering an existing table.
76    pub fn new_alter(original: &TableCatalog) -> Self {
77        fn handle(existing: &mut Existing, path: &mut Path, id: ColumnId, data_type: DataType) {
78            macro_rules! with_segment {
79                ($segment:expr, $block:block) => {
80                    path.push($segment);
81                    $block
82                    path.pop();
83                };
84            }
85
86            match &data_type {
87                DataType::Struct(fields) => {
88                    for ((field_name, field_data_type), field_id) in
89                        fields.iter().zip_eq_fast(fields.ids_or_placeholder())
90                    {
91                        with_segment!(Segment::Field(field_name.to_owned()), {
92                            handle(existing, path, field_id, field_data_type.clone());
93                        });
94                    }
95                }
96                DataType::List(inner) => {
97                    // There's no id for the element as list's own structure won't change.
98                    with_segment!(Segment::ListElement, {
99                        handle(existing, path, ColumnId::placeholder(), *inner.clone());
100                    });
101                }
102                DataType::Map(map) => {
103                    // There's no id for the key/value as map's own structure won't change.
104                    with_segment!(Segment::MapKey, {
105                        handle(existing, path, ColumnId::placeholder(), map.key().clone());
106                    });
107                    with_segment!(Segment::MapValue, {
108                        handle(existing, path, ColumnId::placeholder(), map.value().clone());
109                    });
110                }
111
112                data_types::simple!() => {}
113            }
114
115            existing
116                .try_insert(path.clone(), (id, data_type))
117                .unwrap_or_else(|_| panic!("duplicate path: {:?}", path));
118        }
119
120        let mut existing = Existing::new();
121
122        // Collect all existing fields into `existing`.
123        for col in original.columns() {
124            let mut path = vec![Segment::Field(col.name().to_owned())];
125            handle(
126                &mut existing,
127                &mut path,
128                col.column_id(),
129                col.data_type().clone(),
130            );
131        }
132
133        let version = original.version().expect("version field not set");
134
135        Self {
136            existing,
137            next_column_id: version.next_column_id,
138            version_id: version.version_id + 1,
139        }
140    }
141
142    /// Creates a new [`ColumnIdGenerator`] for a new table.
143    pub fn new_initial() -> Self {
144        Self {
145            existing: Existing::new(),
146            next_column_id: ColumnId::first_user_column(),
147            version_id: INITIAL_TABLE_VERSION_ID,
148        }
149    }
150
151    /// Generate [`ColumnId`]s for the given column and its nested fields (if any) recursively.
152    /// The IDs of nested fields will be reflected in the updated [`DataType`] of the column.
153    ///
154    /// Returns an error if there's incompatible data type change.
155    pub fn generate(&mut self, col: &mut ColumnCatalog) -> Result<()> {
156        let mut path = vec![Segment::Field(col.name().to_owned())];
157
158        if let Some((original_column_id, original_data_type)) = self.existing.get(&path) {
159            if original_data_type == col.data_type() {
160                col.column_desc.column_id = *original_column_id;
161                // Equality above ignores nested field IDs. We need to clone them below.
162                col.column_desc.data_type = original_data_type.clone();
163                return Ok(());
164            } else {
165                // Check if the column can be altered.
166                match original_data_type.can_alter() {
167                    Some(true) => { /* pass */ }
168                    Some(false) => bail!(
169                        "column \"{}\" was persisted with legacy encoding thus cannot be altered, \
170                         consider dropping and readding the column",
171                        col.name()
172                    ),
173                    None => bail!(
174                        "column \"{}\" cannot be altered; only types containing struct can be altered",
175                        col.name()
176                    ),
177                }
178            }
179        }
180
181        fn handle(
182            this: &mut ColumnIdGenerator,
183            path: &mut Path,
184            data_type: DataType,
185        ) -> Result<(ColumnId, DataType)> {
186            macro_rules! with_segment {
187                ($segment:expr, $block:block) => {{
188                    path.push($segment);
189                    let ret = $block;
190                    path.pop();
191                    ret
192                }};
193            }
194
195            let original_column_id = match this.existing.get(&*path) {
196                Some((original_column_id, original_data_type)) => {
197                    // Only check the type name (discriminant) for compatibility check here.
198                    // For nested fields, we will check them recursively later.
199                    let incompatible = original_data_type.type_name() != data_type.type_name()
200                        || matches!(
201                            (original_data_type, &data_type),
202                            (DataType::Vector(old), DataType::Vector(new)) if old != new,
203                        );
204                    if incompatible {
205                        let path = path.iter().join(".");
206                        bail!(
207                            "incompatible data type change from {:?} to {:?} at path \"{}\"",
208                            original_data_type.type_name(),
209                            data_type.type_name(),
210                            path
211                        );
212                    }
213                    Some(*original_column_id)
214                }
215                None => None,
216            };
217
218            // Only top-level column and struct fields need an ID.
219            let need_gen_id = matches!(path.last().unwrap(), Segment::Field(_));
220
221            let new_id = if need_gen_id {
222                if let Some(id) = original_column_id {
223                    assert!(
224                        id != ColumnId::placeholder(),
225                        "original column id should not be placeholder"
226                    );
227                    id
228                } else {
229                    let id = this.next_column_id;
230                    this.next_column_id = this.next_column_id.next();
231                    id
232                }
233            } else {
234                // Column ID is actually unused by caller (for list and map types), just use a placeholder.
235                ColumnId::placeholder()
236            };
237
238            let new_data_type = match data_type {
239                DataType::Struct(fields) => {
240                    let mut new_fields = Vec::new();
241                    let mut ids = Vec::new();
242                    for (field_name, field_data_type) in fields.iter() {
243                        let (id, new_field_data_type) =
244                            with_segment!(Segment::Field(field_name.to_owned()), {
245                                handle(this, path, field_data_type.clone())?
246                            });
247                        new_fields.push((field_name.to_owned(), new_field_data_type));
248                        ids.push(id);
249                    }
250                    DataType::Struct(StructType::new(new_fields).with_ids(ids))
251                }
252                DataType::List(inner) => {
253                    let (_, new_inner) =
254                        with_segment!(Segment::ListElement, { handle(this, path, *inner)? });
255                    DataType::List(Box::new(new_inner))
256                }
257                DataType::Map(map) => {
258                    let (_, new_key) =
259                        with_segment!(Segment::MapKey, { handle(this, path, map.key().clone())? });
260                    let (_, new_value) = with_segment!(Segment::MapValue, {
261                        handle(this, path, map.value().clone())?
262                    });
263                    DataType::Map(MapType::from_kv(new_key, new_value))
264                }
265
266                data_types::simple!() => data_type,
267            };
268
269            Ok((new_id, new_data_type))
270        }
271
272        let (new_column_id, new_data_type) = handle(self, &mut path, col.data_type().clone())?;
273
274        col.column_desc.column_id = new_column_id;
275        col.column_desc.data_type = new_data_type;
276
277        Ok(())
278    }
279
280    /// Consume this generator and return a [`TableVersion`] for the table to be created or altered.
281    pub fn into_version(self) -> TableVersion {
282        TableVersion {
283            version_id: self.version_id,
284            next_column_id: self.next_column_id,
285        }
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use expect_test::expect;
292    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, FieldLike};
293    use risingwave_common::types::StructType;
294    use thiserror_ext::AsReport;
295
296    use super::*;
297
298    struct BrandNewColumn(&'static str);
299    use BrandNewColumn as B;
300
301    impl FieldLike for BrandNewColumn {
302        fn name(&self) -> &str {
303            self.0
304        }
305
306        fn data_type(&self) -> &DataType {
307            &DataType::Boolean
308        }
309    }
310
311    #[easy_ext::ext(ColumnIdGeneratorTestExt)]
312    impl ColumnIdGenerator {
313        /// Generate a column ID for the given field.
314        ///
315        /// This helper function checks that the data type of the field has not changed after
316        /// generating the column ID, i.e., it is called on simple types or composite types
317        /// with `field_ids` unset and legacy encoding.
318        fn generate_simple(&mut self, field: impl FieldLike) -> Result<ColumnId> {
319            let original_data_type = field.data_type().clone();
320
321            let field = Field::new(field.name(), original_data_type.clone());
322            let mut col = ColumnCatalog {
323                column_desc: ColumnDesc::from_field_without_column_id(&field),
324                is_hidden: false,
325            };
326            self.generate(&mut col)?;
327
328            assert_eq!(
329                col.column_desc.data_type, original_data_type,
330                "data type has changed after generating column id, \
331                 are you calling this on a composite type?"
332            );
333
334            Ok(col.column_desc.column_id)
335        }
336    }
337
338    #[test]
339    fn test_col_id_gen_initial() {
340        let mut r#gen = ColumnIdGenerator::new_initial();
341        assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(1));
342        assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(2));
343    }
344
345    #[test]
346    fn test_col_id_gen_alter() {
347        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
348            columns: vec![
349                ColumnCatalog {
350                    column_desc: ColumnDesc::from_field_with_column_id(
351                        &Field::with_name(DataType::Float32, "f32"),
352                        1,
353                    ),
354                    is_hidden: false,
355                },
356                ColumnCatalog {
357                    column_desc: ColumnDesc::from_field_with_column_id(
358                        &Field::with_name(DataType::Float64, "f64"),
359                        2,
360                    ),
361                    is_hidden: false,
362                },
363                ColumnCatalog {
364                    column_desc: ColumnDesc::from_field_with_column_id(
365                        &Field::with_name(
366                            StructType::new([("f1", DataType::Int32)]).into(),
367                            "nested",
368                        ),
369                        3,
370                    ),
371                    is_hidden: false,
372                },
373            ],
374            version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))),
375            ..Default::default()
376        });
377
378        assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(4));
379        assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(5));
380        assert_eq!(
381            r#gen
382                .generate_simple(Field::new("f32", DataType::Float32))
383                .unwrap(),
384            ColumnId::new(1)
385        );
386
387        // mismatched simple data type
388        let err = r#gen
389            .generate_simple(Field::new("f64", DataType::Float32))
390            .unwrap_err();
391        expect![[r#"column "f64" cannot be altered; only types containing struct can be altered"#]]
392            .assert_eq(&err.to_report_string());
393
394        // mismatched composite data type
395        // we require the nested data type to be exactly the same
396        let err = r#gen
397            .generate_simple(Field::new(
398                "nested",
399                // NOTE: field ids are unset, legacy encoding, thus cannot be altered
400                StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(),
401            ))
402            .unwrap_err();
403        expect![[r#"column "nested" was persisted with legacy encoding thus cannot be altered, consider dropping and readding the column"#]].assert_eq(&err.to_report_string());
404
405        // matched composite data type, should work
406        let id = r#gen
407            .generate_simple(Field::new(
408                "nested",
409                StructType::new([("f1", DataType::Int32)]).into(),
410            ))
411            .unwrap();
412        assert_eq!(id, ColumnId::new(3));
413
414        assert_eq!(r#gen.generate_simple(B("v3")).unwrap(), ColumnId::new(6));
415    }
416
417    #[test]
418    fn test_col_id_gen_alter_composite_type() {
419        let ori_type = || {
420            DataType::from(StructType::new([
421                ("f1", DataType::Int32),
422                (
423                    "map",
424                    MapType::from_kv(
425                        DataType::Varchar,
426                        DataType::List(Box::new(
427                            StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
428                                .into(),
429                        )),
430                    )
431                    .into(),
432                ),
433            ]))
434        };
435
436        let new_type = || {
437            DataType::from(StructType::new([
438                ("f4", DataType::Int32),
439                (
440                    "map",
441                    MapType::from_kv(
442                        DataType::Varchar,
443                        DataType::List(Box::new(
444                            StructType::new([
445                                ("f5", DataType::Int32),
446                                ("f3", DataType::Boolean),
447                                ("f6", DataType::Float32),
448                            ])
449                            .into(),
450                        )),
451                    )
452                    .into(),
453                ),
454            ]))
455        };
456
457        let incompatible_new_type = || {
458            DataType::from(StructType::new([(
459                "map",
460                MapType::from_kv(
461                    DataType::Varchar,
462                    DataType::List(Box::new(
463                        StructType::new([("f6", DataType::Float64)]).into(),
464                    )),
465                )
466                .into(),
467            )]))
468        };
469
470        let mut r#gen = ColumnIdGenerator::new_initial();
471        let mut col = ColumnCatalog {
472            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
473                "nested",
474                ori_type(),
475            )),
476            is_hidden: false,
477        };
478        r#gen.generate(&mut col).unwrap();
479        let version = r#gen.into_version();
480
481        expect![[r#"
482            ColumnCatalog {
483                column_desc: ColumnDesc {
484                    data_type: Struct(
485                        StructType {
486                            fields: [
487                                (
488                                    "f1",
489                                    Int32,
490                                ),
491                                (
492                                    "map",
493                                    Map(
494                                        MapType(
495                                            (
496                                                Varchar,
497                                                List(
498                                                    Struct(
499                                                        StructType {
500                                                            fields: [
501                                                                (
502                                                                    "f2",
503                                                                    Int32,
504                                                                ),
505                                                                (
506                                                                    "f3",
507                                                                    Boolean,
508                                                                ),
509                                                            ],
510                                                            field_ids: [
511                                                                #4,
512                                                                #5,
513                                                            ],
514                                                        },
515                                                    ),
516                                                ),
517                                            ),
518                                        ),
519                                    ),
520                                ),
521                            ],
522                            field_ids: [
523                                #2,
524                                #3,
525                            ],
526                        },
527                    ),
528                    column_id: #1,
529                    name: "nested",
530                    generated_or_default_column: None,
531                    description: None,
532                    additional_column: AdditionalColumn {
533                        column_type: None,
534                    },
535                    version: Pr13707,
536                    system_column: None,
537                    nullable: true,
538                },
539                is_hidden: false,
540            }
541        "#]]
542        .assert_debug_eq(&col);
543
544        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
545            columns: vec![col],
546            version: Some(version),
547            ..Default::default()
548        });
549        let mut new_col = ColumnCatalog {
550            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
551                "nested",
552                new_type(),
553            )),
554            is_hidden: false,
555        };
556        r#gen.generate(&mut new_col).unwrap();
557        let version = r#gen.into_version();
558
559        expect![[r#"
560            ColumnCatalog {
561                column_desc: ColumnDesc {
562                    data_type: Struct(
563                        StructType {
564                            fields: [
565                                (
566                                    "f4",
567                                    Int32,
568                                ),
569                                (
570                                    "map",
571                                    Map(
572                                        MapType(
573                                            (
574                                                Varchar,
575                                                List(
576                                                    Struct(
577                                                        StructType {
578                                                            fields: [
579                                                                (
580                                                                    "f5",
581                                                                    Int32,
582                                                                ),
583                                                                (
584                                                                    "f3",
585                                                                    Boolean,
586                                                                ),
587                                                                (
588                                                                    "f6",
589                                                                    Float32,
590                                                                ),
591                                                            ],
592                                                            field_ids: [
593                                                                #7,
594                                                                #5,
595                                                                #8,
596                                                            ],
597                                                        },
598                                                    ),
599                                                ),
600                                            ),
601                                        ),
602                                    ),
603                                ),
604                            ],
605                            field_ids: [
606                                #6,
607                                #3,
608                            ],
609                        },
610                    ),
611                    column_id: #1,
612                    name: "nested",
613                    generated_or_default_column: None,
614                    description: None,
615                    additional_column: AdditionalColumn {
616                        column_type: None,
617                    },
618                    version: Pr13707,
619                    system_column: None,
620                    nullable: true,
621                },
622                is_hidden: false,
623            }
624        "#]]
625        .assert_debug_eq(&new_col);
626
627        let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
628            columns: vec![new_col],
629            version: Some(version),
630            ..Default::default()
631        });
632
633        let mut new_col = ColumnCatalog {
634            column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
635                "nested",
636                incompatible_new_type(),
637            )),
638            is_hidden: false,
639        };
640        let err = r#gen.generate(&mut new_col).unwrap_err();
641        expect![[r#"incompatible data type change from Float32 to Float64 at path "nested.map.value.element.f6""#]].assert_eq(&err.to_report_string());
642    }
643}