1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, INITIAL_TABLE_VERSION_ID, TableVersionId};
20use risingwave_common::types::{DataType, MapType, StructType, data_types};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use crate::TableCatalog;
24use crate::catalog::ColumnId;
25use crate::catalog::table_catalog::TableVersion;
26use crate::error::Result;
27
28#[derive(Debug, Clone, Hash, PartialEq, Eq)]
30enum Segment {
31 Field(String), ListElement,
33 MapKey,
34 MapValue,
35}
36
37impl std::fmt::Display for Segment {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Segment::Field(name) => write!(f, "{}", name),
41 Segment::ListElement => write!(f, "element"),
42 Segment::MapKey => write!(f, "key"),
43 Segment::MapValue => write!(f, "value"),
44 }
45 }
46}
47
48type Path = Vec<Segment>;
50
51type Existing = HashMap<Path, (ColumnId, DataType)>;
52
53#[derive(Debug)]
55pub struct ColumnIdGenerator {
56 existing: Existing,
63
64 next_column_id: ColumnId,
66
67 version_id: TableVersionId,
72}
73
74impl ColumnIdGenerator {
75 pub fn new_alter(original: &TableCatalog) -> Self {
77 fn handle(existing: &mut Existing, path: &mut Path, id: ColumnId, data_type: DataType) {
78 macro_rules! with_segment {
79 ($segment:expr, $block:block) => {
80 path.push($segment);
81 $block
82 path.pop();
83 };
84 }
85
86 match &data_type {
87 DataType::Struct(fields) => {
88 for ((field_name, field_data_type), field_id) in
89 fields.iter().zip_eq_fast(fields.ids_or_placeholder())
90 {
91 with_segment!(Segment::Field(field_name.to_owned()), {
92 handle(existing, path, field_id, field_data_type.clone());
93 });
94 }
95 }
96 DataType::List(inner) => {
97 with_segment!(Segment::ListElement, {
99 handle(existing, path, ColumnId::placeholder(), *inner.clone());
100 });
101 }
102 DataType::Map(map) => {
103 with_segment!(Segment::MapKey, {
105 handle(existing, path, ColumnId::placeholder(), map.key().clone());
106 });
107 with_segment!(Segment::MapValue, {
108 handle(existing, path, ColumnId::placeholder(), map.value().clone());
109 });
110 }
111
112 data_types::simple!() => {}
113 }
114
115 existing
116 .try_insert(path.clone(), (id, data_type))
117 .unwrap_or_else(|_| panic!("duplicate path: {:?}", path));
118 }
119
120 let mut existing = Existing::new();
121
122 for col in original.columns() {
124 let mut path = vec![Segment::Field(col.name().to_owned())];
125 handle(
126 &mut existing,
127 &mut path,
128 col.column_id(),
129 col.data_type().clone(),
130 );
131 }
132
133 let version = original.version().expect("version field not set");
134
135 Self {
136 existing,
137 next_column_id: version.next_column_id,
138 version_id: version.version_id + 1,
139 }
140 }
141
142 pub fn new_initial() -> Self {
144 Self {
145 existing: Existing::new(),
146 next_column_id: ColumnId::first_user_column(),
147 version_id: INITIAL_TABLE_VERSION_ID,
148 }
149 }
150
151 pub fn generate(&mut self, col: &mut ColumnCatalog) -> Result<()> {
156 let mut path = vec![Segment::Field(col.name().to_owned())];
157
158 if let Some((original_column_id, original_data_type)) = self.existing.get(&path) {
159 if original_data_type == col.data_type() {
160 col.column_desc.column_id = *original_column_id;
161 col.column_desc.data_type = original_data_type.clone();
163 return Ok(());
164 } else {
165 match original_data_type.can_alter() {
167 Some(true) => { }
168 Some(false) => bail!(
169 "column \"{}\" was persisted with legacy encoding thus cannot be altered, \
170 consider dropping and readding the column",
171 col.name()
172 ),
173 None => bail!(
174 "column \"{}\" cannot be altered; only types containing struct can be altered",
175 col.name()
176 ),
177 }
178 }
179 }
180
181 fn handle(
182 this: &mut ColumnIdGenerator,
183 path: &mut Path,
184 data_type: DataType,
185 ) -> Result<(ColumnId, DataType)> {
186 macro_rules! with_segment {
187 ($segment:expr, $block:block) => {{
188 path.push($segment);
189 let ret = $block;
190 path.pop();
191 ret
192 }};
193 }
194
195 let original_column_id = match this.existing.get(&*path) {
196 Some((original_column_id, original_data_type)) => {
197 let incompatible = original_data_type.type_name() != data_type.type_name()
200 || matches!(
201 (original_data_type, &data_type),
202 (DataType::Vector(old), DataType::Vector(new)) if old != new,
203 );
204 if incompatible {
205 let path = path.iter().join(".");
206 bail!(
207 "incompatible data type change from {:?} to {:?} at path \"{}\"",
208 original_data_type.type_name(),
209 data_type.type_name(),
210 path
211 );
212 }
213 Some(*original_column_id)
214 }
215 None => None,
216 };
217
218 let need_gen_id = matches!(path.last().unwrap(), Segment::Field(_));
220
221 let new_id = if need_gen_id {
222 if let Some(id) = original_column_id {
223 assert!(
224 id != ColumnId::placeholder(),
225 "original column id should not be placeholder"
226 );
227 id
228 } else {
229 let id = this.next_column_id;
230 this.next_column_id = this.next_column_id.next();
231 id
232 }
233 } else {
234 ColumnId::placeholder()
236 };
237
238 let new_data_type = match data_type {
239 DataType::Struct(fields) => {
240 let mut new_fields = Vec::new();
241 let mut ids = Vec::new();
242 for (field_name, field_data_type) in fields.iter() {
243 let (id, new_field_data_type) =
244 with_segment!(Segment::Field(field_name.to_owned()), {
245 handle(this, path, field_data_type.clone())?
246 });
247 new_fields.push((field_name.to_owned(), new_field_data_type));
248 ids.push(id);
249 }
250 DataType::Struct(StructType::new(new_fields).with_ids(ids))
251 }
252 DataType::List(inner) => {
253 let (_, new_inner) =
254 with_segment!(Segment::ListElement, { handle(this, path, *inner)? });
255 DataType::List(Box::new(new_inner))
256 }
257 DataType::Map(map) => {
258 let (_, new_key) =
259 with_segment!(Segment::MapKey, { handle(this, path, map.key().clone())? });
260 let (_, new_value) = with_segment!(Segment::MapValue, {
261 handle(this, path, map.value().clone())?
262 });
263 DataType::Map(MapType::from_kv(new_key, new_value))
264 }
265
266 data_types::simple!() => data_type,
267 };
268
269 Ok((new_id, new_data_type))
270 }
271
272 let (new_column_id, new_data_type) = handle(self, &mut path, col.data_type().clone())?;
273
274 col.column_desc.column_id = new_column_id;
275 col.column_desc.data_type = new_data_type;
276
277 Ok(())
278 }
279
280 pub fn into_version(self) -> TableVersion {
282 TableVersion {
283 version_id: self.version_id,
284 next_column_id: self.next_column_id,
285 }
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use expect_test::expect;
292 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, FieldLike};
293 use risingwave_common::types::StructType;
294 use thiserror_ext::AsReport;
295
296 use super::*;
297
298 struct BrandNewColumn(&'static str);
299 use BrandNewColumn as B;
300
301 impl FieldLike for BrandNewColumn {
302 fn name(&self) -> &str {
303 self.0
304 }
305
306 fn data_type(&self) -> &DataType {
307 &DataType::Boolean
308 }
309 }
310
311 #[easy_ext::ext(ColumnIdGeneratorTestExt)]
312 impl ColumnIdGenerator {
313 fn generate_simple(&mut self, field: impl FieldLike) -> Result<ColumnId> {
319 let original_data_type = field.data_type().clone();
320
321 let field = Field::new(field.name(), original_data_type.clone());
322 let mut col = ColumnCatalog {
323 column_desc: ColumnDesc::from_field_without_column_id(&field),
324 is_hidden: false,
325 };
326 self.generate(&mut col)?;
327
328 assert_eq!(
329 col.column_desc.data_type, original_data_type,
330 "data type has changed after generating column id, \
331 are you calling this on a composite type?"
332 );
333
334 Ok(col.column_desc.column_id)
335 }
336 }
337
338 #[test]
339 fn test_col_id_gen_initial() {
340 let mut r#gen = ColumnIdGenerator::new_initial();
341 assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(1));
342 assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(2));
343 }
344
345 #[test]
346 fn test_col_id_gen_alter() {
347 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
348 columns: vec![
349 ColumnCatalog {
350 column_desc: ColumnDesc::from_field_with_column_id(
351 &Field::with_name(DataType::Float32, "f32"),
352 1,
353 ),
354 is_hidden: false,
355 },
356 ColumnCatalog {
357 column_desc: ColumnDesc::from_field_with_column_id(
358 &Field::with_name(DataType::Float64, "f64"),
359 2,
360 ),
361 is_hidden: false,
362 },
363 ColumnCatalog {
364 column_desc: ColumnDesc::from_field_with_column_id(
365 &Field::with_name(
366 StructType::new([("f1", DataType::Int32)]).into(),
367 "nested",
368 ),
369 3,
370 ),
371 is_hidden: false,
372 },
373 ],
374 version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))),
375 ..Default::default()
376 });
377
378 assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(4));
379 assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(5));
380 assert_eq!(
381 r#gen
382 .generate_simple(Field::new("f32", DataType::Float32))
383 .unwrap(),
384 ColumnId::new(1)
385 );
386
387 let err = r#gen
389 .generate_simple(Field::new("f64", DataType::Float32))
390 .unwrap_err();
391 expect![[r#"column "f64" cannot be altered; only types containing struct can be altered"#]]
392 .assert_eq(&err.to_report_string());
393
394 let err = r#gen
397 .generate_simple(Field::new(
398 "nested",
399 StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(),
401 ))
402 .unwrap_err();
403 expect![[r#"column "nested" was persisted with legacy encoding thus cannot be altered, consider dropping and readding the column"#]].assert_eq(&err.to_report_string());
404
405 let id = r#gen
407 .generate_simple(Field::new(
408 "nested",
409 StructType::new([("f1", DataType::Int32)]).into(),
410 ))
411 .unwrap();
412 assert_eq!(id, ColumnId::new(3));
413
414 assert_eq!(r#gen.generate_simple(B("v3")).unwrap(), ColumnId::new(6));
415 }
416
417 #[test]
418 fn test_col_id_gen_alter_composite_type() {
419 let ori_type = || {
420 DataType::from(StructType::new([
421 ("f1", DataType::Int32),
422 (
423 "map",
424 MapType::from_kv(
425 DataType::Varchar,
426 DataType::List(Box::new(
427 StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
428 .into(),
429 )),
430 )
431 .into(),
432 ),
433 ]))
434 };
435
436 let new_type = || {
437 DataType::from(StructType::new([
438 ("f4", DataType::Int32),
439 (
440 "map",
441 MapType::from_kv(
442 DataType::Varchar,
443 DataType::List(Box::new(
444 StructType::new([
445 ("f5", DataType::Int32),
446 ("f3", DataType::Boolean),
447 ("f6", DataType::Float32),
448 ])
449 .into(),
450 )),
451 )
452 .into(),
453 ),
454 ]))
455 };
456
457 let incompatible_new_type = || {
458 DataType::from(StructType::new([(
459 "map",
460 MapType::from_kv(
461 DataType::Varchar,
462 DataType::List(Box::new(
463 StructType::new([("f6", DataType::Float64)]).into(),
464 )),
465 )
466 .into(),
467 )]))
468 };
469
470 let mut r#gen = ColumnIdGenerator::new_initial();
471 let mut col = ColumnCatalog {
472 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
473 "nested",
474 ori_type(),
475 )),
476 is_hidden: false,
477 };
478 r#gen.generate(&mut col).unwrap();
479 let version = r#gen.into_version();
480
481 expect![[r#"
482 ColumnCatalog {
483 column_desc: ColumnDesc {
484 data_type: Struct(
485 StructType {
486 fields: [
487 (
488 "f1",
489 Int32,
490 ),
491 (
492 "map",
493 Map(
494 MapType(
495 (
496 Varchar,
497 List(
498 Struct(
499 StructType {
500 fields: [
501 (
502 "f2",
503 Int32,
504 ),
505 (
506 "f3",
507 Boolean,
508 ),
509 ],
510 field_ids: [
511 #4,
512 #5,
513 ],
514 },
515 ),
516 ),
517 ),
518 ),
519 ),
520 ),
521 ],
522 field_ids: [
523 #2,
524 #3,
525 ],
526 },
527 ),
528 column_id: #1,
529 name: "nested",
530 generated_or_default_column: None,
531 description: None,
532 additional_column: AdditionalColumn {
533 column_type: None,
534 },
535 version: Pr13707,
536 system_column: None,
537 nullable: true,
538 },
539 is_hidden: false,
540 }
541 "#]]
542 .assert_debug_eq(&col);
543
544 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
545 columns: vec![col],
546 version: Some(version),
547 ..Default::default()
548 });
549 let mut new_col = ColumnCatalog {
550 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
551 "nested",
552 new_type(),
553 )),
554 is_hidden: false,
555 };
556 r#gen.generate(&mut new_col).unwrap();
557 let version = r#gen.into_version();
558
559 expect![[r#"
560 ColumnCatalog {
561 column_desc: ColumnDesc {
562 data_type: Struct(
563 StructType {
564 fields: [
565 (
566 "f4",
567 Int32,
568 ),
569 (
570 "map",
571 Map(
572 MapType(
573 (
574 Varchar,
575 List(
576 Struct(
577 StructType {
578 fields: [
579 (
580 "f5",
581 Int32,
582 ),
583 (
584 "f3",
585 Boolean,
586 ),
587 (
588 "f6",
589 Float32,
590 ),
591 ],
592 field_ids: [
593 #7,
594 #5,
595 #8,
596 ],
597 },
598 ),
599 ),
600 ),
601 ),
602 ),
603 ),
604 ],
605 field_ids: [
606 #6,
607 #3,
608 ],
609 },
610 ),
611 column_id: #1,
612 name: "nested",
613 generated_or_default_column: None,
614 description: None,
615 additional_column: AdditionalColumn {
616 column_type: None,
617 },
618 version: Pr13707,
619 system_column: None,
620 nullable: true,
621 },
622 is_hidden: false,
623 }
624 "#]]
625 .assert_debug_eq(&new_col);
626
627 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
628 columns: vec![new_col],
629 version: Some(version),
630 ..Default::default()
631 });
632
633 let mut new_col = ColumnCatalog {
634 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
635 "nested",
636 incompatible_new_type(),
637 )),
638 is_hidden: false,
639 };
640 let err = r#gen.generate(&mut new_col).unwrap_err();
641 expect![[r#"incompatible data type change from Float32 to Float64 at path "nested.map.value.element.f6""#]].assert_eq(&err.to_report_string());
642 }
643}