1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, INITIAL_TABLE_VERSION_ID, TableVersionId};
20use risingwave_common::types::{DataType, MapType, StructType, data_types};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use crate::TableCatalog;
24use crate::catalog::ColumnId;
25use crate::catalog::table_catalog::TableVersion;
26use crate::error::Result;
27
28#[derive(Debug, Clone, Hash, PartialEq, Eq)]
30enum Segment {
31 Field(String), ListElement,
33 MapKey,
34 MapValue,
35}
36
37impl std::fmt::Display for Segment {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Segment::Field(name) => write!(f, "{}", name),
41 Segment::ListElement => write!(f, "element"),
42 Segment::MapKey => write!(f, "key"),
43 Segment::MapValue => write!(f, "value"),
44 }
45 }
46}
47
48type Path = Vec<Segment>;
50
51type Existing = HashMap<Path, (ColumnId, DataType)>;
52
53#[derive(Debug)]
55pub struct ColumnIdGenerator {
56 existing: Existing,
63
64 next_column_id: ColumnId,
66
67 version_id: TableVersionId,
72}
73
74impl ColumnIdGenerator {
75 pub fn new_alter(original: &TableCatalog) -> Self {
77 fn handle(existing: &mut Existing, path: &mut Path, id: ColumnId, data_type: DataType) {
78 macro_rules! with_segment {
79 ($segment:expr, $block:block) => {
80 path.push($segment);
81 $block
82 path.pop();
83 };
84 }
85
86 match &data_type {
87 DataType::Struct(fields) => {
88 for ((field_name, field_data_type), field_id) in
89 fields.iter().zip_eq_fast(fields.ids_or_placeholder())
90 {
91 with_segment!(Segment::Field(field_name.to_owned()), {
92 handle(existing, path, field_id, field_data_type.clone());
93 });
94 }
95 }
96 DataType::List(list) => {
97 with_segment!(Segment::ListElement, {
99 handle(existing, path, ColumnId::placeholder(), list.elem().clone());
100 });
101 }
102 DataType::Map(map) => {
103 with_segment!(Segment::MapKey, {
105 handle(existing, path, ColumnId::placeholder(), map.key().clone());
106 });
107 with_segment!(Segment::MapValue, {
108 handle(existing, path, ColumnId::placeholder(), map.value().clone());
109 });
110 }
111
112 data_types::simple!() => {}
113 }
114
115 existing
116 .try_insert(path.clone(), (id, data_type))
117 .unwrap_or_else(|_| panic!("duplicate path: {:?}", path));
118 }
119
120 let mut existing = Existing::new();
121
122 for col in original.columns() {
124 let mut path = vec![Segment::Field(col.name().to_owned())];
125 handle(
126 &mut existing,
127 &mut path,
128 col.column_id(),
129 col.data_type().clone(),
130 );
131 }
132
133 let version = original.version().expect("version field not set");
134
135 Self {
136 existing,
137 next_column_id: version.next_column_id,
138 version_id: version.version_id + 1,
139 }
140 }
141
142 pub fn new_initial() -> Self {
144 Self {
145 existing: Existing::new(),
146 next_column_id: ColumnId::first_user_column(),
147 version_id: INITIAL_TABLE_VERSION_ID,
148 }
149 }
150
151 pub fn generate(&mut self, col: &mut ColumnCatalog) -> Result<()> {
156 let mut path = vec![Segment::Field(col.name().to_owned())];
157
158 if let Some((original_column_id, original_data_type)) = self.existing.get(&path) {
159 if original_data_type == col.data_type() {
160 col.column_desc.column_id = *original_column_id;
161 col.column_desc.data_type = original_data_type.clone();
163 return Ok(());
164 } else {
165 match original_data_type.can_alter() {
167 Some(true) => { }
168 Some(false) => bail!(
169 "column \"{}\" was persisted with legacy encoding thus cannot be altered, \
170 consider dropping and readding the column",
171 col.name()
172 ),
173 None => bail!(
174 "column \"{}\" cannot be altered; only types containing struct can be altered",
175 col.name()
176 ),
177 }
178 }
179 }
180
181 fn handle(
182 this: &mut ColumnIdGenerator,
183 path: &mut Path,
184 data_type: DataType,
185 ) -> Result<(ColumnId, DataType)> {
186 macro_rules! with_segment {
187 ($segment:expr, $block:block) => {{
188 path.push($segment);
189 let ret = $block;
190 path.pop();
191 ret
192 }};
193 }
194
195 let original_column_id = match this.existing.get(&*path) {
196 Some((original_column_id, original_data_type)) => {
197 let incompatible = original_data_type.type_name() != data_type.type_name()
200 || matches!(
201 (original_data_type, &data_type),
202 (DataType::Vector(old), DataType::Vector(new)) if old != new,
203 );
204 if incompatible {
205 let path = path.iter().join(".");
206 bail!(
207 "incompatible data type change from {:?} to {:?} at path \"{}\"",
208 original_data_type.type_name(),
209 data_type.type_name(),
210 path
211 );
212 }
213 Some(*original_column_id)
214 }
215 None => None,
216 };
217
218 let need_gen_id = matches!(path.last().unwrap(), Segment::Field(_));
220
221 let new_id = if need_gen_id {
222 if let Some(id) = original_column_id {
223 assert!(
224 id != ColumnId::placeholder(),
225 "original column id should not be placeholder"
226 );
227 id
228 } else {
229 let id = this.next_column_id;
230 this.next_column_id = this.next_column_id.next();
231 id
232 }
233 } else {
234 ColumnId::placeholder()
236 };
237
238 let new_data_type = match data_type {
239 DataType::Struct(fields) => {
240 let mut new_fields = Vec::new();
241 let mut ids = Vec::new();
242 for (field_name, field_data_type) in fields.iter() {
243 let (id, new_field_data_type) =
244 with_segment!(Segment::Field(field_name.to_owned()), {
245 handle(this, path, field_data_type.clone())?
246 });
247 new_fields.push((field_name.to_owned(), new_field_data_type));
248 ids.push(id);
249 }
250 DataType::Struct(StructType::new(new_fields).with_ids(ids))
251 }
252 DataType::List(list) => {
253 let (_, new_inner) = with_segment!(Segment::ListElement, {
254 handle(this, path, list.into_elem())?
255 });
256 DataType::list(new_inner)
257 }
258 DataType::Map(map) => {
259 let (key, value) = map.into_kv();
260 let (_, new_key) = with_segment!(Segment::MapKey, { handle(this, path, key)? });
261 let (_, new_value) =
262 with_segment!(Segment::MapValue, { handle(this, path, value)? });
263 DataType::Map(MapType::from_kv(new_key, new_value))
264 }
265
266 data_types::simple!() => data_type,
267 };
268
269 Ok((new_id, new_data_type))
270 }
271
272 let (new_column_id, new_data_type) = handle(self, &mut path, col.data_type().clone())?;
273
274 col.column_desc.column_id = new_column_id;
275 col.column_desc.data_type = new_data_type;
276
277 Ok(())
278 }
279
280 pub fn into_version(self) -> TableVersion {
282 TableVersion {
283 version_id: self.version_id,
284 next_column_id: self.next_column_id,
285 }
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use expect_test::expect;
292 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, FieldLike};
293 use risingwave_common::types::StructType;
294 use thiserror_ext::AsReport;
295
296 use super::*;
297
298 struct BrandNewColumn(&'static str);
299 use BrandNewColumn as B;
300
301 impl FieldLike for BrandNewColumn {
302 fn name(&self) -> &str {
303 self.0
304 }
305
306 fn data_type(&self) -> &DataType {
307 &DataType::Boolean
308 }
309 }
310
311 #[easy_ext::ext(ColumnIdGeneratorTestExt)]
312 impl ColumnIdGenerator {
313 fn generate_simple(&mut self, field: impl FieldLike) -> Result<ColumnId> {
319 let original_data_type = field.data_type().clone();
320
321 let field = Field::new(field.name(), original_data_type.clone());
322 let mut col = ColumnCatalog {
323 column_desc: ColumnDesc::from_field_without_column_id(&field),
324 is_hidden: false,
325 };
326 self.generate(&mut col)?;
327
328 assert_eq!(
329 col.column_desc.data_type, original_data_type,
330 "data type has changed after generating column id, \
331 are you calling this on a composite type?"
332 );
333
334 Ok(col.column_desc.column_id)
335 }
336 }
337
338 #[test]
339 fn test_col_id_gen_initial() {
340 let mut r#gen = ColumnIdGenerator::new_initial();
341 assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(1));
342 assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(2));
343 }
344
345 #[test]
346 fn test_col_id_gen_alter() {
347 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
348 columns: vec![
349 ColumnCatalog {
350 column_desc: ColumnDesc::from_field_with_column_id(
351 &Field::with_name(DataType::Float32, "f32"),
352 1,
353 ),
354 is_hidden: false,
355 },
356 ColumnCatalog {
357 column_desc: ColumnDesc::from_field_with_column_id(
358 &Field::with_name(DataType::Float64, "f64"),
359 2,
360 ),
361 is_hidden: false,
362 },
363 ColumnCatalog {
364 column_desc: ColumnDesc::from_field_with_column_id(
365 &Field::with_name(
366 StructType::new([("f1", DataType::Int32)]).into(),
367 "nested",
368 ),
369 3,
370 ),
371 is_hidden: false,
372 },
373 ],
374 version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))),
375 ..Default::default()
376 });
377
378 assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(4));
379 assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(5));
380 assert_eq!(
381 r#gen
382 .generate_simple(Field::new("f32", DataType::Float32))
383 .unwrap(),
384 ColumnId::new(1)
385 );
386
387 let err = r#gen
389 .generate_simple(Field::new("f64", DataType::Float32))
390 .unwrap_err();
391 expect![[r#"column "f64" cannot be altered; only types containing struct can be altered"#]]
392 .assert_eq(&err.to_report_string());
393
394 let err = r#gen
397 .generate_simple(Field::new(
398 "nested",
399 StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(),
401 ))
402 .unwrap_err();
403 expect![[r#"column "nested" was persisted with legacy encoding thus cannot be altered, consider dropping and readding the column"#]].assert_eq(&err.to_report_string());
404
405 let id = r#gen
407 .generate_simple(Field::new(
408 "nested",
409 StructType::new([("f1", DataType::Int32)]).into(),
410 ))
411 .unwrap();
412 assert_eq!(id, ColumnId::new(3));
413
414 assert_eq!(r#gen.generate_simple(B("v3")).unwrap(), ColumnId::new(6));
415 }
416
417 #[test]
418 fn test_col_id_gen_alter_composite_type() {
419 let ori_type = || {
420 DataType::from(StructType::new([
421 ("f1", DataType::Int32),
422 (
423 "map",
424 MapType::from_kv(
425 DataType::Varchar,
426 DataType::list(
427 StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
428 .into(),
429 ),
430 )
431 .into(),
432 ),
433 ]))
434 };
435
436 let new_type = || {
437 DataType::from(StructType::new([
438 ("f4", DataType::Int32),
439 (
440 "map",
441 MapType::from_kv(
442 DataType::Varchar,
443 DataType::list(
444 StructType::new([
445 ("f5", DataType::Int32),
446 ("f3", DataType::Boolean),
447 ("f6", DataType::Float32),
448 ])
449 .into(),
450 ),
451 )
452 .into(),
453 ),
454 ]))
455 };
456
457 let incompatible_new_type = || {
458 DataType::from(StructType::new([(
459 "map",
460 MapType::from_kv(
461 DataType::Varchar,
462 DataType::list(StructType::new([("f6", DataType::Float64)]).into()),
463 )
464 .into(),
465 )]))
466 };
467
468 let mut r#gen = ColumnIdGenerator::new_initial();
469 let mut col = ColumnCatalog {
470 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
471 "nested",
472 ori_type(),
473 )),
474 is_hidden: false,
475 };
476 r#gen.generate(&mut col).unwrap();
477 let version = r#gen.into_version();
478
479 expect![[r#"
480 ColumnCatalog {
481 column_desc: ColumnDesc {
482 data_type: Struct(
483 StructType {
484 fields: [
485 (
486 "f1",
487 Int32,
488 ),
489 (
490 "map",
491 Map(
492 MapType(
493 (
494 Varchar,
495 List(
496 Struct(
497 StructType {
498 fields: [
499 (
500 "f2",
501 Int32,
502 ),
503 (
504 "f3",
505 Boolean,
506 ),
507 ],
508 field_ids: [
509 #4,
510 #5,
511 ],
512 },
513 ),
514 ),
515 ),
516 ),
517 ),
518 ),
519 ],
520 field_ids: [
521 #2,
522 #3,
523 ],
524 },
525 ),
526 column_id: #1,
527 name: "nested",
528 generated_or_default_column: None,
529 description: None,
530 additional_column: AdditionalColumn {
531 column_type: None,
532 },
533 version: Pr13707,
534 system_column: None,
535 nullable: true,
536 },
537 is_hidden: false,
538 }
539 "#]]
540 .assert_debug_eq(&col);
541
542 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
543 columns: vec![col],
544 version: Some(version),
545 ..Default::default()
546 });
547 let mut new_col = ColumnCatalog {
548 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
549 "nested",
550 new_type(),
551 )),
552 is_hidden: false,
553 };
554 r#gen.generate(&mut new_col).unwrap();
555 let version = r#gen.into_version();
556
557 expect![[r#"
558 ColumnCatalog {
559 column_desc: ColumnDesc {
560 data_type: Struct(
561 StructType {
562 fields: [
563 (
564 "f4",
565 Int32,
566 ),
567 (
568 "map",
569 Map(
570 MapType(
571 (
572 Varchar,
573 List(
574 Struct(
575 StructType {
576 fields: [
577 (
578 "f5",
579 Int32,
580 ),
581 (
582 "f3",
583 Boolean,
584 ),
585 (
586 "f6",
587 Float32,
588 ),
589 ],
590 field_ids: [
591 #7,
592 #5,
593 #8,
594 ],
595 },
596 ),
597 ),
598 ),
599 ),
600 ),
601 ),
602 ],
603 field_ids: [
604 #6,
605 #3,
606 ],
607 },
608 ),
609 column_id: #1,
610 name: "nested",
611 generated_or_default_column: None,
612 description: None,
613 additional_column: AdditionalColumn {
614 column_type: None,
615 },
616 version: Pr13707,
617 system_column: None,
618 nullable: true,
619 },
620 is_hidden: false,
621 }
622 "#]]
623 .assert_debug_eq(&new_col);
624
625 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
626 columns: vec![new_col],
627 version: Some(version),
628 ..Default::default()
629 });
630
631 let mut new_col = ColumnCatalog {
632 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
633 "nested",
634 incompatible_new_type(),
635 )),
636 is_hidden: false,
637 };
638 let err = r#gen.generate(&mut new_col).unwrap_err();
639 expect![[r#"incompatible data type change from Float32 to Float64 at path "nested.map.value.element.f6""#]].assert_eq(&err.to_report_string());
640 }
641}