1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, INITIAL_TABLE_VERSION_ID, TableVersionId};
20use risingwave_common::types::{DataType, MapType, StructType, data_types};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use crate::TableCatalog;
24use crate::catalog::ColumnId;
25use crate::catalog::table_catalog::TableVersion;
26use crate::error::Result;
27
28#[derive(Debug, Clone, Hash, PartialEq, Eq)]
30enum Segment {
31 Field(String), ListElement,
33 MapKey,
34 MapValue,
35}
36
37impl std::fmt::Display for Segment {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Segment::Field(name) => write!(f, "{}", name),
41 Segment::ListElement => write!(f, "element"),
42 Segment::MapKey => write!(f, "key"),
43 Segment::MapValue => write!(f, "value"),
44 }
45 }
46}
47
48type Path = Vec<Segment>;
50
51type Existing = HashMap<Path, (ColumnId, DataType)>;
52
53#[derive(Debug)]
55pub struct ColumnIdGenerator {
56 existing: Existing,
63
64 next_column_id: ColumnId,
66
67 version_id: TableVersionId,
72}
73
74impl ColumnIdGenerator {
75 pub fn new_alter(original: &TableCatalog) -> Self {
77 fn handle(existing: &mut Existing, path: &mut Path, id: ColumnId, data_type: DataType) {
78 macro_rules! with_segment {
79 ($segment:expr, $block:block) => {
80 path.push($segment);
81 $block
82 path.pop();
83 };
84 }
85
86 match &data_type {
87 DataType::Struct(fields) => {
88 for ((field_name, field_data_type), field_id) in
89 fields.iter().zip_eq_fast(fields.ids_or_placeholder())
90 {
91 with_segment!(Segment::Field(field_name.to_owned()), {
92 handle(existing, path, field_id, field_data_type.clone());
93 });
94 }
95 }
96 DataType::List(inner) => {
97 with_segment!(Segment::ListElement, {
99 handle(existing, path, ColumnId::placeholder(), *inner.clone());
100 });
101 }
102 DataType::Map(map) => {
103 with_segment!(Segment::MapKey, {
105 handle(existing, path, ColumnId::placeholder(), map.key().clone());
106 });
107 with_segment!(Segment::MapValue, {
108 handle(existing, path, ColumnId::placeholder(), map.value().clone());
109 });
110 }
111
112 DataType::Vector(_) => {}
113 data_types::simple!() => {}
114 }
115
116 existing
117 .try_insert(path.clone(), (id, data_type))
118 .unwrap_or_else(|_| panic!("duplicate path: {:?}", path));
119 }
120
121 let mut existing = Existing::new();
122
123 for col in original.columns() {
125 let mut path = vec![Segment::Field(col.name().to_owned())];
126 handle(
127 &mut existing,
128 &mut path,
129 col.column_id(),
130 col.data_type().clone(),
131 );
132 }
133
134 let version = original.version().expect("version field not set");
135
136 Self {
137 existing,
138 next_column_id: version.next_column_id,
139 version_id: version.version_id + 1,
140 }
141 }
142
143 pub fn new_initial() -> Self {
145 Self {
146 existing: Existing::new(),
147 next_column_id: ColumnId::first_user_column(),
148 version_id: INITIAL_TABLE_VERSION_ID,
149 }
150 }
151
152 pub fn generate(&mut self, col: &mut ColumnCatalog) -> Result<()> {
157 let mut path = vec![Segment::Field(col.name().to_owned())];
158
159 if let Some((original_column_id, original_data_type)) = self.existing.get(&path) {
160 if original_data_type == col.data_type() {
161 col.column_desc.column_id = *original_column_id;
162 col.column_desc.data_type = original_data_type.clone();
164 return Ok(());
165 } else {
166 match original_data_type.can_alter() {
168 Some(true) => { }
169 Some(false) => bail!(
170 "column \"{}\" was persisted with legacy encoding thus cannot be altered, \
171 consider dropping and readding the column",
172 col.name()
173 ),
174 None => bail!(
175 "column \"{}\" cannot be altered; only types containing struct can be altered",
176 col.name()
177 ),
178 }
179 }
180 }
181
182 fn handle(
183 this: &mut ColumnIdGenerator,
184 path: &mut Path,
185 data_type: DataType,
186 ) -> Result<(ColumnId, DataType)> {
187 macro_rules! with_segment {
188 ($segment:expr, $block:block) => {{
189 path.push($segment);
190 let ret = $block;
191 path.pop();
192 ret
193 }};
194 }
195
196 let original_column_id = match this.existing.get(&*path) {
197 Some((original_column_id, original_data_type)) => {
198 let incompatible = original_data_type.type_name() != data_type.type_name()
201 || matches!(
202 (original_data_type, &data_type),
203 (DataType::Vector(old), DataType::Vector(new)) if old != new,
204 );
205 if incompatible {
206 let path = path.iter().join(".");
207 bail!(
208 "incompatible data type change from {:?} to {:?} at path \"{}\"",
209 original_data_type.type_name(),
210 data_type.type_name(),
211 path
212 );
213 }
214 Some(*original_column_id)
215 }
216 None => None,
217 };
218
219 let need_gen_id = matches!(path.last().unwrap(), Segment::Field(_));
221
222 let new_id = if need_gen_id {
223 if let Some(id) = original_column_id {
224 assert!(
225 id != ColumnId::placeholder(),
226 "original column id should not be placeholder"
227 );
228 id
229 } else {
230 let id = this.next_column_id;
231 this.next_column_id = this.next_column_id.next();
232 id
233 }
234 } else {
235 ColumnId::placeholder()
237 };
238
239 let new_data_type = match data_type {
240 DataType::Struct(fields) => {
241 let mut new_fields = Vec::new();
242 let mut ids = Vec::new();
243 for (field_name, field_data_type) in fields.iter() {
244 let (id, new_field_data_type) =
245 with_segment!(Segment::Field(field_name.to_owned()), {
246 handle(this, path, field_data_type.clone())?
247 });
248 new_fields.push((field_name.to_owned(), new_field_data_type));
249 ids.push(id);
250 }
251 DataType::Struct(StructType::new(new_fields).with_ids(ids))
252 }
253 DataType::List(inner) => {
254 let (_, new_inner) =
255 with_segment!(Segment::ListElement, { handle(this, path, *inner)? });
256 DataType::List(Box::new(new_inner))
257 }
258 DataType::Map(map) => {
259 let (_, new_key) =
260 with_segment!(Segment::MapKey, { handle(this, path, map.key().clone())? });
261 let (_, new_value) = with_segment!(Segment::MapValue, {
262 handle(this, path, map.value().clone())?
263 });
264 DataType::Map(MapType::from_kv(new_key, new_value))
265 }
266
267 DataType::Vector(_) => data_type,
268 data_types::simple!() => data_type,
269 };
270
271 Ok((new_id, new_data_type))
272 }
273
274 let (new_column_id, new_data_type) = handle(self, &mut path, col.data_type().clone())?;
275
276 col.column_desc.column_id = new_column_id;
277 col.column_desc.data_type = new_data_type;
278
279 Ok(())
280 }
281
282 pub fn into_version(self) -> TableVersion {
284 TableVersion {
285 version_id: self.version_id,
286 next_column_id: self.next_column_id,
287 }
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use expect_test::expect;
294 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, FieldLike};
295 use risingwave_common::types::StructType;
296 use thiserror_ext::AsReport;
297
298 use super::*;
299
300 struct BrandNewColumn(&'static str);
301 use BrandNewColumn as B;
302
303 impl FieldLike for BrandNewColumn {
304 fn name(&self) -> &str {
305 self.0
306 }
307
308 fn data_type(&self) -> &DataType {
309 &DataType::Boolean
310 }
311 }
312
313 #[easy_ext::ext(ColumnIdGeneratorTestExt)]
314 impl ColumnIdGenerator {
315 fn generate_simple(&mut self, field: impl FieldLike) -> Result<ColumnId> {
321 let original_data_type = field.data_type().clone();
322
323 let field = Field::new(field.name(), original_data_type.clone());
324 let mut col = ColumnCatalog {
325 column_desc: ColumnDesc::from_field_without_column_id(&field),
326 is_hidden: false,
327 };
328 self.generate(&mut col)?;
329
330 assert_eq!(
331 col.column_desc.data_type, original_data_type,
332 "data type has changed after generating column id, \
333 are you calling this on a composite type?"
334 );
335
336 Ok(col.column_desc.column_id)
337 }
338 }
339
340 #[test]
341 fn test_col_id_gen_initial() {
342 let mut r#gen = ColumnIdGenerator::new_initial();
343 assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(1));
344 assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(2));
345 }
346
347 #[test]
348 fn test_col_id_gen_alter() {
349 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
350 columns: vec![
351 ColumnCatalog {
352 column_desc: ColumnDesc::from_field_with_column_id(
353 &Field::with_name(DataType::Float32, "f32"),
354 1,
355 ),
356 is_hidden: false,
357 },
358 ColumnCatalog {
359 column_desc: ColumnDesc::from_field_with_column_id(
360 &Field::with_name(DataType::Float64, "f64"),
361 2,
362 ),
363 is_hidden: false,
364 },
365 ColumnCatalog {
366 column_desc: ColumnDesc::from_field_with_column_id(
367 &Field::with_name(
368 StructType::new([("f1", DataType::Int32)]).into(),
369 "nested",
370 ),
371 3,
372 ),
373 is_hidden: false,
374 },
375 ],
376 version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))),
377 ..Default::default()
378 });
379
380 assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(4));
381 assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(5));
382 assert_eq!(
383 r#gen
384 .generate_simple(Field::new("f32", DataType::Float32))
385 .unwrap(),
386 ColumnId::new(1)
387 );
388
389 let err = r#gen
391 .generate_simple(Field::new("f64", DataType::Float32))
392 .unwrap_err();
393 expect![[r#"column "f64" cannot be altered; only types containing struct can be altered"#]]
394 .assert_eq(&err.to_report_string());
395
396 let err = r#gen
399 .generate_simple(Field::new(
400 "nested",
401 StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(),
403 ))
404 .unwrap_err();
405 expect![[r#"column "nested" was persisted with legacy encoding thus cannot be altered, consider dropping and readding the column"#]].assert_eq(&err.to_report_string());
406
407 let id = r#gen
409 .generate_simple(Field::new(
410 "nested",
411 StructType::new([("f1", DataType::Int32)]).into(),
412 ))
413 .unwrap();
414 assert_eq!(id, ColumnId::new(3));
415
416 assert_eq!(r#gen.generate_simple(B("v3")).unwrap(), ColumnId::new(6));
417 }
418
419 #[test]
420 fn test_col_id_gen_alter_composite_type() {
421 let ori_type = || {
422 DataType::from(StructType::new([
423 ("f1", DataType::Int32),
424 (
425 "map",
426 MapType::from_kv(
427 DataType::Varchar,
428 DataType::List(Box::new(
429 StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
430 .into(),
431 )),
432 )
433 .into(),
434 ),
435 ]))
436 };
437
438 let new_type = || {
439 DataType::from(StructType::new([
440 ("f4", DataType::Int32),
441 (
442 "map",
443 MapType::from_kv(
444 DataType::Varchar,
445 DataType::List(Box::new(
446 StructType::new([
447 ("f5", DataType::Int32),
448 ("f3", DataType::Boolean),
449 ("f6", DataType::Float32),
450 ])
451 .into(),
452 )),
453 )
454 .into(),
455 ),
456 ]))
457 };
458
459 let incompatible_new_type = || {
460 DataType::from(StructType::new([(
461 "map",
462 MapType::from_kv(
463 DataType::Varchar,
464 DataType::List(Box::new(
465 StructType::new([("f6", DataType::Float64)]).into(),
466 )),
467 )
468 .into(),
469 )]))
470 };
471
472 let mut r#gen = ColumnIdGenerator::new_initial();
473 let mut col = ColumnCatalog {
474 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
475 "nested",
476 ori_type(),
477 )),
478 is_hidden: false,
479 };
480 r#gen.generate(&mut col).unwrap();
481 let version = r#gen.into_version();
482
483 expect![[r#"
484 ColumnCatalog {
485 column_desc: ColumnDesc {
486 data_type: Struct(
487 StructType {
488 fields: [
489 (
490 "f1",
491 Int32,
492 ),
493 (
494 "map",
495 Map(
496 MapType(
497 (
498 Varchar,
499 List(
500 Struct(
501 StructType {
502 fields: [
503 (
504 "f2",
505 Int32,
506 ),
507 (
508 "f3",
509 Boolean,
510 ),
511 ],
512 field_ids: [
513 #4,
514 #5,
515 ],
516 },
517 ),
518 ),
519 ),
520 ),
521 ),
522 ),
523 ],
524 field_ids: [
525 #2,
526 #3,
527 ],
528 },
529 ),
530 column_id: #1,
531 name: "nested",
532 generated_or_default_column: None,
533 description: None,
534 additional_column: AdditionalColumn {
535 column_type: None,
536 },
537 version: Pr13707,
538 system_column: None,
539 nullable: true,
540 },
541 is_hidden: false,
542 }
543 "#]]
544 .assert_debug_eq(&col);
545
546 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
547 columns: vec![col],
548 version: Some(version),
549 ..Default::default()
550 });
551 let mut new_col = ColumnCatalog {
552 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
553 "nested",
554 new_type(),
555 )),
556 is_hidden: false,
557 };
558 r#gen.generate(&mut new_col).unwrap();
559 let version = r#gen.into_version();
560
561 expect![[r#"
562 ColumnCatalog {
563 column_desc: ColumnDesc {
564 data_type: Struct(
565 StructType {
566 fields: [
567 (
568 "f4",
569 Int32,
570 ),
571 (
572 "map",
573 Map(
574 MapType(
575 (
576 Varchar,
577 List(
578 Struct(
579 StructType {
580 fields: [
581 (
582 "f5",
583 Int32,
584 ),
585 (
586 "f3",
587 Boolean,
588 ),
589 (
590 "f6",
591 Float32,
592 ),
593 ],
594 field_ids: [
595 #7,
596 #5,
597 #8,
598 ],
599 },
600 ),
601 ),
602 ),
603 ),
604 ),
605 ),
606 ],
607 field_ids: [
608 #6,
609 #3,
610 ],
611 },
612 ),
613 column_id: #1,
614 name: "nested",
615 generated_or_default_column: None,
616 description: None,
617 additional_column: AdditionalColumn {
618 column_type: None,
619 },
620 version: Pr13707,
621 system_column: None,
622 nullable: true,
623 },
624 is_hidden: false,
625 }
626 "#]]
627 .assert_debug_eq(&new_col);
628
629 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
630 columns: vec![new_col],
631 version: Some(version),
632 ..Default::default()
633 });
634
635 let mut new_col = ColumnCatalog {
636 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
637 "nested",
638 incompatible_new_type(),
639 )),
640 is_hidden: false,
641 };
642 let err = r#gen.generate(&mut new_col).unwrap_err();
643 expect![[r#"incompatible data type change from Float32 to Float64 at path "nested.map.value.element.f6""#]].assert_eq(&err.to_report_string());
644 }
645}