1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, INITIAL_TABLE_VERSION_ID, TableVersionId};
20use risingwave_common::types::{DataType, MapType, StructType, data_types};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use crate::TableCatalog;
24use crate::catalog::ColumnId;
25use crate::catalog::table_catalog::TableVersion;
26use crate::error::Result;
27
28#[derive(Debug, Clone, Hash, PartialEq, Eq)]
30enum Segment {
31 Field(String), ListElement,
33 MapKey,
34 MapValue,
35}
36
37impl std::fmt::Display for Segment {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Segment::Field(name) => write!(f, "{}", name),
41 Segment::ListElement => write!(f, "element"),
42 Segment::MapKey => write!(f, "key"),
43 Segment::MapValue => write!(f, "value"),
44 }
45 }
46}
47
48type Path = Vec<Segment>;
50
51type Existing = HashMap<Path, (ColumnId, DataType)>;
52
53#[derive(Debug)]
55pub struct ColumnIdGenerator {
56 existing: Existing,
63
64 next_column_id: ColumnId,
66
67 version_id: TableVersionId,
72}
73
74impl ColumnIdGenerator {
75 pub fn new_alter(original: &TableCatalog) -> Self {
77 fn handle(existing: &mut Existing, path: &mut Path, id: ColumnId, data_type: DataType) {
78 macro_rules! with_segment {
79 ($segment:expr, $block:block) => {
80 path.push($segment);
81 $block
82 path.pop();
83 };
84 }
85
86 match &data_type {
87 DataType::Struct(fields) => {
88 for ((field_name, field_data_type), field_id) in
89 fields.iter().zip_eq_fast(fields.ids_or_placeholder())
90 {
91 with_segment!(Segment::Field(field_name.to_owned()), {
92 handle(existing, path, field_id, field_data_type.clone());
93 });
94 }
95 }
96 DataType::List(inner) => {
97 with_segment!(Segment::ListElement, {
99 handle(existing, path, ColumnId::placeholder(), *inner.clone());
100 });
101 }
102 DataType::Map(map) => {
103 with_segment!(Segment::MapKey, {
105 handle(existing, path, ColumnId::placeholder(), map.key().clone());
106 });
107 with_segment!(Segment::MapValue, {
108 handle(existing, path, ColumnId::placeholder(), map.value().clone());
109 });
110 }
111
112 data_types::simple!() => {}
113 }
114
115 existing
116 .try_insert(path.clone(), (id, data_type))
117 .unwrap_or_else(|_| panic!("duplicate path: {:?}", path));
118 }
119
120 let mut existing = Existing::new();
121
122 for col in original.columns() {
124 let mut path = vec![Segment::Field(col.name().to_owned())];
125 handle(
126 &mut existing,
127 &mut path,
128 col.column_id(),
129 col.data_type().clone(),
130 );
131 }
132
133 let version = original.version().expect("version field not set");
134
135 Self {
136 existing,
137 next_column_id: version.next_column_id,
138 version_id: version.version_id + 1,
139 }
140 }
141
142 pub fn new_initial() -> Self {
144 Self {
145 existing: Existing::new(),
146 next_column_id: ColumnId::first_user_column(),
147 version_id: INITIAL_TABLE_VERSION_ID,
148 }
149 }
150
151 pub fn generate(&mut self, col: &mut ColumnCatalog) -> Result<()> {
156 let mut path = vec![Segment::Field(col.name().to_owned())];
157
158 if let Some((original_column_id, original_data_type)) = self.existing.get(&path) {
159 if original_data_type == col.data_type() {
160 col.column_desc.column_id = *original_column_id;
162 return Ok(());
163 } else {
164 match original_data_type.can_alter() {
166 Some(true) => { }
167 Some(false) => bail!(
168 "column \"{}\" was persisted with legacy encoding thus cannot be altered, \
169 consider dropping and readding the column",
170 col.name()
171 ),
172 None => bail!(
173 "column \"{}\" cannot be altered; only types containing struct can be altered",
174 col.name()
175 ),
176 }
177 }
178 }
179
180 fn handle(
181 this: &mut ColumnIdGenerator,
182 path: &mut Path,
183 data_type: DataType,
184 ) -> Result<(ColumnId, DataType)> {
185 macro_rules! with_segment {
186 ($segment:expr, $block:block) => {{
187 path.push($segment);
188 let ret = $block;
189 path.pop();
190 ret
191 }};
192 }
193
194 let original_column_id = match this.existing.get(&*path) {
195 Some((original_column_id, original_data_type)) => {
196 if original_data_type.type_name() != data_type.type_name() {
199 let path = path.iter().join(".");
200 bail!(
201 "incompatible data type change from {:?} to {:?} at path \"{}\"",
202 original_data_type.type_name(),
203 data_type.type_name(),
204 path
205 );
206 }
207 Some(*original_column_id)
208 }
209 None => None,
210 };
211
212 let need_gen_id = matches!(path.last().unwrap(), Segment::Field(_));
214
215 let new_id = if need_gen_id {
216 if let Some(id) = original_column_id {
217 assert!(
218 id != ColumnId::placeholder(),
219 "original column id should not be placeholder"
220 );
221 id
222 } else {
223 let id = this.next_column_id;
224 this.next_column_id = this.next_column_id.next();
225 id
226 }
227 } else {
228 ColumnId::placeholder()
230 };
231
232 let new_data_type = match data_type {
233 DataType::Struct(fields) => {
234 let mut new_fields = Vec::new();
235 let mut ids = Vec::new();
236 for (field_name, field_data_type) in fields.iter() {
237 let (id, new_field_data_type) =
238 with_segment!(Segment::Field(field_name.to_owned()), {
239 handle(this, path, field_data_type.clone())?
240 });
241 new_fields.push((field_name.to_owned(), new_field_data_type));
242 ids.push(id);
243 }
244 DataType::Struct(StructType::new(new_fields).with_ids(ids))
245 }
246 DataType::List(inner) => {
247 let (_, new_inner) =
248 with_segment!(Segment::ListElement, { handle(this, path, *inner)? });
249 DataType::List(Box::new(new_inner))
250 }
251 DataType::Map(map) => {
252 let (_, new_key) =
253 with_segment!(Segment::MapKey, { handle(this, path, map.key().clone())? });
254 let (_, new_value) = with_segment!(Segment::MapValue, {
255 handle(this, path, map.value().clone())?
256 });
257 DataType::Map(MapType::from_kv(new_key, new_value))
258 }
259
260 data_types::simple!() => data_type,
261 };
262
263 Ok((new_id, new_data_type))
264 }
265
266 let (new_column_id, new_data_type) = handle(self, &mut path, col.data_type().clone())?;
267
268 col.column_desc.column_id = new_column_id;
269 col.column_desc.data_type = new_data_type;
270
271 Ok(())
272 }
273
274 pub fn into_version(self) -> TableVersion {
276 TableVersion {
277 version_id: self.version_id,
278 next_column_id: self.next_column_id,
279 }
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use expect_test::expect;
286 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, FieldLike};
287 use risingwave_common::types::StructType;
288 use thiserror_ext::AsReport;
289
290 use super::*;
291
292 struct BrandNewColumn(&'static str);
293 use BrandNewColumn as B;
294
295 impl FieldLike for BrandNewColumn {
296 fn name(&self) -> &str {
297 self.0
298 }
299
300 fn data_type(&self) -> &DataType {
301 &DataType::Boolean
302 }
303 }
304
305 #[easy_ext::ext(ColumnIdGeneratorTestExt)]
306 impl ColumnIdGenerator {
307 fn generate_simple(&mut self, field: impl FieldLike) -> Result<ColumnId> {
313 let original_data_type = field.data_type().clone();
314
315 let field = Field::new(field.name(), original_data_type.clone());
316 let mut col = ColumnCatalog {
317 column_desc: ColumnDesc::from_field_without_column_id(&field),
318 is_hidden: false,
319 };
320 self.generate(&mut col)?;
321
322 assert_eq!(
323 col.column_desc.data_type, original_data_type,
324 "data type has changed after generating column id, \
325 are you calling this on a composite type?"
326 );
327
328 Ok(col.column_desc.column_id)
329 }
330 }
331
332 #[test]
333 fn test_col_id_gen_initial() {
334 let mut r#gen = ColumnIdGenerator::new_initial();
335 assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(1));
336 assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(2));
337 }
338
339 #[test]
340 fn test_col_id_gen_alter() {
341 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
342 columns: vec![
343 ColumnCatalog {
344 column_desc: ColumnDesc::from_field_with_column_id(
345 &Field::with_name(DataType::Float32, "f32"),
346 1,
347 ),
348 is_hidden: false,
349 },
350 ColumnCatalog {
351 column_desc: ColumnDesc::from_field_with_column_id(
352 &Field::with_name(DataType::Float64, "f64"),
353 2,
354 ),
355 is_hidden: false,
356 },
357 ColumnCatalog {
358 column_desc: ColumnDesc::from_field_with_column_id(
359 &Field::with_name(
360 StructType::new([("f1", DataType::Int32)]).into(),
361 "nested",
362 ),
363 3,
364 ),
365 is_hidden: false,
366 },
367 ],
368 version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))),
369 ..Default::default()
370 });
371
372 assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(4));
373 assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(5));
374 assert_eq!(
375 r#gen
376 .generate_simple(Field::new("f32", DataType::Float32))
377 .unwrap(),
378 ColumnId::new(1)
379 );
380
381 let err = r#gen
383 .generate_simple(Field::new("f64", DataType::Float32))
384 .unwrap_err();
385 expect![[r#"column "f64" cannot be altered; only types containing struct can be altered"#]]
386 .assert_eq(&err.to_report_string());
387
388 let err = r#gen
391 .generate_simple(Field::new(
392 "nested",
393 StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(),
395 ))
396 .unwrap_err();
397 expect![[r#"column "nested" was persisted with legacy encoding thus cannot be altered, consider dropping and readding the column"#]].assert_eq(&err.to_report_string());
398
399 let id = r#gen
401 .generate_simple(Field::new(
402 "nested",
403 StructType::new([("f1", DataType::Int32)]).into(),
404 ))
405 .unwrap();
406 assert_eq!(id, ColumnId::new(3));
407
408 assert_eq!(r#gen.generate_simple(B("v3")).unwrap(), ColumnId::new(6));
409 }
410
411 #[test]
412 fn test_col_id_gen_alter_composite_type() {
413 let ori_type = || {
414 DataType::from(StructType::new([
415 ("f1", DataType::Int32),
416 (
417 "map",
418 MapType::from_kv(
419 DataType::Varchar,
420 DataType::List(Box::new(
421 StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
422 .into(),
423 )),
424 )
425 .into(),
426 ),
427 ]))
428 };
429
430 let new_type = || {
431 DataType::from(StructType::new([
432 ("f4", DataType::Int32),
433 (
434 "map",
435 MapType::from_kv(
436 DataType::Varchar,
437 DataType::List(Box::new(
438 StructType::new([
439 ("f5", DataType::Int32),
440 ("f3", DataType::Boolean),
441 ("f6", DataType::Float32),
442 ])
443 .into(),
444 )),
445 )
446 .into(),
447 ),
448 ]))
449 };
450
451 let incompatible_new_type = || {
452 DataType::from(StructType::new([(
453 "map",
454 MapType::from_kv(
455 DataType::Varchar,
456 DataType::List(Box::new(
457 StructType::new([("f6", DataType::Float64)]).into(),
458 )),
459 )
460 .into(),
461 )]))
462 };
463
464 let mut r#gen = ColumnIdGenerator::new_initial();
465 let mut col = ColumnCatalog {
466 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
467 "nested",
468 ori_type(),
469 )),
470 is_hidden: false,
471 };
472 r#gen.generate(&mut col).unwrap();
473 let version = r#gen.into_version();
474
475 expect![[r#"
476 ColumnCatalog {
477 column_desc: ColumnDesc {
478 data_type: Struct(
479 StructType {
480 fields: [
481 (
482 "f1",
483 Int32,
484 ),
485 (
486 "map",
487 Map(
488 MapType(
489 (
490 Varchar,
491 List(
492 Struct(
493 StructType {
494 fields: [
495 (
496 "f2",
497 Int32,
498 ),
499 (
500 "f3",
501 Boolean,
502 ),
503 ],
504 field_ids: [
505 #4,
506 #5,
507 ],
508 },
509 ),
510 ),
511 ),
512 ),
513 ),
514 ),
515 ],
516 field_ids: [
517 #2,
518 #3,
519 ],
520 },
521 ),
522 column_id: #1,
523 name: "nested",
524 generated_or_default_column: None,
525 description: None,
526 additional_column: AdditionalColumn {
527 column_type: None,
528 },
529 version: Pr13707,
530 system_column: None,
531 nullable: true,
532 },
533 is_hidden: false,
534 }
535 "#]]
536 .assert_debug_eq(&col);
537
538 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
539 columns: vec![col],
540 version: Some(version),
541 ..Default::default()
542 });
543 let mut new_col = ColumnCatalog {
544 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
545 "nested",
546 new_type(),
547 )),
548 is_hidden: false,
549 };
550 r#gen.generate(&mut new_col).unwrap();
551 let version = r#gen.into_version();
552
553 expect![[r#"
554 ColumnCatalog {
555 column_desc: ColumnDesc {
556 data_type: Struct(
557 StructType {
558 fields: [
559 (
560 "f4",
561 Int32,
562 ),
563 (
564 "map",
565 Map(
566 MapType(
567 (
568 Varchar,
569 List(
570 Struct(
571 StructType {
572 fields: [
573 (
574 "f5",
575 Int32,
576 ),
577 (
578 "f3",
579 Boolean,
580 ),
581 (
582 "f6",
583 Float32,
584 ),
585 ],
586 field_ids: [
587 #7,
588 #5,
589 #8,
590 ],
591 },
592 ),
593 ),
594 ),
595 ),
596 ),
597 ),
598 ],
599 field_ids: [
600 #6,
601 #3,
602 ],
603 },
604 ),
605 column_id: #1,
606 name: "nested",
607 generated_or_default_column: None,
608 description: None,
609 additional_column: AdditionalColumn {
610 column_type: None,
611 },
612 version: Pr13707,
613 system_column: None,
614 nullable: true,
615 },
616 is_hidden: false,
617 }
618 "#]]
619 .assert_debug_eq(&new_col);
620
621 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
622 columns: vec![new_col],
623 version: Some(version),
624 ..Default::default()
625 });
626
627 let mut new_col = ColumnCatalog {
628 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
629 "nested",
630 incompatible_new_type(),
631 )),
632 is_hidden: false,
633 };
634 let err = r#gen.generate(&mut new_col).unwrap_err();
635 expect![[r#"incompatible data type change from Float32 to Float64 at path "nested.map.value.element.f6""#]].assert_eq(&err.to_report_string());
636 }
637}