1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_common::bail;
19use risingwave_common::catalog::{ColumnCatalog, INITIAL_TABLE_VERSION_ID, TableVersionId};
20use risingwave_common::types::{DataType, MapType, StructType, data_types};
21use risingwave_common::util::iter_util::ZipEqFast;
22
23use crate::TableCatalog;
24use crate::catalog::ColumnId;
25use crate::catalog::table_catalog::TableVersion;
26use crate::error::Result;
27
28#[derive(Debug, Clone, Hash, PartialEq, Eq)]
30enum Segment {
31 Field(String), ListElement,
33 MapKey,
34 MapValue,
35}
36
37impl std::fmt::Display for Segment {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Segment::Field(name) => write!(f, "{}", name),
41 Segment::ListElement => write!(f, "element"),
42 Segment::MapKey => write!(f, "key"),
43 Segment::MapValue => write!(f, "value"),
44 }
45 }
46}
47
48type Path = Vec<Segment>;
50
51type Existing = HashMap<Path, (ColumnId, DataType)>;
52
53#[derive(Debug)]
55pub struct ColumnIdGenerator {
56 existing: Existing,
63
64 next_column_id: ColumnId,
66
67 version_id: TableVersionId,
72}
73
74impl ColumnIdGenerator {
75 pub fn new_alter(original: &TableCatalog) -> Self {
77 fn handle(existing: &mut Existing, path: &mut Path, id: ColumnId, data_type: DataType) {
78 macro_rules! with_segment {
79 ($segment:expr, $block:block) => {
80 path.push($segment);
81 $block
82 path.pop();
83 };
84 }
85
86 match &data_type {
87 DataType::Struct(fields) => {
88 for ((field_name, field_data_type), field_id) in
89 fields.iter().zip_eq_fast(fields.ids_or_placeholder())
90 {
91 with_segment!(Segment::Field(field_name.to_owned()), {
92 handle(existing, path, field_id, field_data_type.clone());
93 });
94 }
95 }
96 DataType::List(inner) => {
97 with_segment!(Segment::ListElement, {
99 handle(existing, path, ColumnId::placeholder(), *inner.clone());
100 });
101 }
102 DataType::Map(map) => {
103 with_segment!(Segment::MapKey, {
105 handle(existing, path, ColumnId::placeholder(), map.key().clone());
106 });
107 with_segment!(Segment::MapValue, {
108 handle(existing, path, ColumnId::placeholder(), map.value().clone());
109 });
110 }
111
112 DataType::Vector(_) => {}
113 data_types::simple!() => {}
114 }
115
116 existing
117 .try_insert(path.clone(), (id, data_type))
118 .unwrap_or_else(|_| panic!("duplicate path: {:?}", path));
119 }
120
121 let mut existing = Existing::new();
122
123 for col in original.columns() {
125 let mut path = vec![Segment::Field(col.name().to_owned())];
126 handle(
127 &mut existing,
128 &mut path,
129 col.column_id(),
130 col.data_type().clone(),
131 );
132 }
133
134 let version = original.version().expect("version field not set");
135
136 Self {
137 existing,
138 next_column_id: version.next_column_id,
139 version_id: version.version_id + 1,
140 }
141 }
142
143 pub fn new_initial() -> Self {
145 Self {
146 existing: Existing::new(),
147 next_column_id: ColumnId::first_user_column(),
148 version_id: INITIAL_TABLE_VERSION_ID,
149 }
150 }
151
152 pub fn generate(&mut self, col: &mut ColumnCatalog) -> Result<()> {
157 let mut path = vec![Segment::Field(col.name().to_owned())];
158
159 if let Some((original_column_id, original_data_type)) = self.existing.get(&path) {
160 if original_data_type == col.data_type() {
161 col.column_desc.column_id = *original_column_id;
162 col.column_desc.data_type = original_data_type.clone();
164 return Ok(());
165 } else {
166 match original_data_type.can_alter() {
168 Some(true) => { }
169 Some(false) => bail!(
170 "column \"{}\" was persisted with legacy encoding thus cannot be altered, \
171 consider dropping and readding the column",
172 col.name()
173 ),
174 None => bail!(
175 "column \"{}\" cannot be altered; only types containing struct can be altered",
176 col.name()
177 ),
178 }
179 }
180 }
181
182 fn handle(
183 this: &mut ColumnIdGenerator,
184 path: &mut Path,
185 data_type: DataType,
186 ) -> Result<(ColumnId, DataType)> {
187 macro_rules! with_segment {
188 ($segment:expr, $block:block) => {{
189 path.push($segment);
190 let ret = $block;
191 path.pop();
192 ret
193 }};
194 }
195
196 let original_column_id = match this.existing.get(&*path) {
197 Some((original_column_id, original_data_type)) => {
198 if original_data_type.type_name() != data_type.type_name() {
201 let path = path.iter().join(".");
202 bail!(
203 "incompatible data type change from {:?} to {:?} at path \"{}\"",
204 original_data_type.type_name(),
205 data_type.type_name(),
206 path
207 );
208 }
209 Some(*original_column_id)
210 }
211 None => None,
212 };
213
214 let need_gen_id = matches!(path.last().unwrap(), Segment::Field(_));
216
217 let new_id = if need_gen_id {
218 if let Some(id) = original_column_id {
219 assert!(
220 id != ColumnId::placeholder(),
221 "original column id should not be placeholder"
222 );
223 id
224 } else {
225 let id = this.next_column_id;
226 this.next_column_id = this.next_column_id.next();
227 id
228 }
229 } else {
230 ColumnId::placeholder()
232 };
233
234 let new_data_type = match data_type {
235 DataType::Struct(fields) => {
236 let mut new_fields = Vec::new();
237 let mut ids = Vec::new();
238 for (field_name, field_data_type) in fields.iter() {
239 let (id, new_field_data_type) =
240 with_segment!(Segment::Field(field_name.to_owned()), {
241 handle(this, path, field_data_type.clone())?
242 });
243 new_fields.push((field_name.to_owned(), new_field_data_type));
244 ids.push(id);
245 }
246 DataType::Struct(StructType::new(new_fields).with_ids(ids))
247 }
248 DataType::List(inner) => {
249 let (_, new_inner) =
250 with_segment!(Segment::ListElement, { handle(this, path, *inner)? });
251 DataType::List(Box::new(new_inner))
252 }
253 DataType::Map(map) => {
254 let (_, new_key) =
255 with_segment!(Segment::MapKey, { handle(this, path, map.key().clone())? });
256 let (_, new_value) = with_segment!(Segment::MapValue, {
257 handle(this, path, map.value().clone())?
258 });
259 DataType::Map(MapType::from_kv(new_key, new_value))
260 }
261
262 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
263 data_types::simple!() => data_type,
264 };
265
266 Ok((new_id, new_data_type))
267 }
268
269 let (new_column_id, new_data_type) = handle(self, &mut path, col.data_type().clone())?;
270
271 col.column_desc.column_id = new_column_id;
272 col.column_desc.data_type = new_data_type;
273
274 Ok(())
275 }
276
277 pub fn into_version(self) -> TableVersion {
279 TableVersion {
280 version_id: self.version_id,
281 next_column_id: self.next_column_id,
282 }
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use expect_test::expect;
289 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, FieldLike};
290 use risingwave_common::types::StructType;
291 use thiserror_ext::AsReport;
292
293 use super::*;
294
295 struct BrandNewColumn(&'static str);
296 use BrandNewColumn as B;
297
298 impl FieldLike for BrandNewColumn {
299 fn name(&self) -> &str {
300 self.0
301 }
302
303 fn data_type(&self) -> &DataType {
304 &DataType::Boolean
305 }
306 }
307
308 #[easy_ext::ext(ColumnIdGeneratorTestExt)]
309 impl ColumnIdGenerator {
310 fn generate_simple(&mut self, field: impl FieldLike) -> Result<ColumnId> {
316 let original_data_type = field.data_type().clone();
317
318 let field = Field::new(field.name(), original_data_type.clone());
319 let mut col = ColumnCatalog {
320 column_desc: ColumnDesc::from_field_without_column_id(&field),
321 is_hidden: false,
322 };
323 self.generate(&mut col)?;
324
325 assert_eq!(
326 col.column_desc.data_type, original_data_type,
327 "data type has changed after generating column id, \
328 are you calling this on a composite type?"
329 );
330
331 Ok(col.column_desc.column_id)
332 }
333 }
334
335 #[test]
336 fn test_col_id_gen_initial() {
337 let mut r#gen = ColumnIdGenerator::new_initial();
338 assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(1));
339 assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(2));
340 }
341
342 #[test]
343 fn test_col_id_gen_alter() {
344 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
345 columns: vec![
346 ColumnCatalog {
347 column_desc: ColumnDesc::from_field_with_column_id(
348 &Field::with_name(DataType::Float32, "f32"),
349 1,
350 ),
351 is_hidden: false,
352 },
353 ColumnCatalog {
354 column_desc: ColumnDesc::from_field_with_column_id(
355 &Field::with_name(DataType::Float64, "f64"),
356 2,
357 ),
358 is_hidden: false,
359 },
360 ColumnCatalog {
361 column_desc: ColumnDesc::from_field_with_column_id(
362 &Field::with_name(
363 StructType::new([("f1", DataType::Int32)]).into(),
364 "nested",
365 ),
366 3,
367 ),
368 is_hidden: false,
369 },
370 ],
371 version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))),
372 ..Default::default()
373 });
374
375 assert_eq!(r#gen.generate_simple(B("v1")).unwrap(), ColumnId::new(4));
376 assert_eq!(r#gen.generate_simple(B("v2")).unwrap(), ColumnId::new(5));
377 assert_eq!(
378 r#gen
379 .generate_simple(Field::new("f32", DataType::Float32))
380 .unwrap(),
381 ColumnId::new(1)
382 );
383
384 let err = r#gen
386 .generate_simple(Field::new("f64", DataType::Float32))
387 .unwrap_err();
388 expect![[r#"column "f64" cannot be altered; only types containing struct can be altered"#]]
389 .assert_eq(&err.to_report_string());
390
391 let err = r#gen
394 .generate_simple(Field::new(
395 "nested",
396 StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(),
398 ))
399 .unwrap_err();
400 expect![[r#"column "nested" was persisted with legacy encoding thus cannot be altered, consider dropping and readding the column"#]].assert_eq(&err.to_report_string());
401
402 let id = r#gen
404 .generate_simple(Field::new(
405 "nested",
406 StructType::new([("f1", DataType::Int32)]).into(),
407 ))
408 .unwrap();
409 assert_eq!(id, ColumnId::new(3));
410
411 assert_eq!(r#gen.generate_simple(B("v3")).unwrap(), ColumnId::new(6));
412 }
413
414 #[test]
415 fn test_col_id_gen_alter_composite_type() {
416 let ori_type = || {
417 DataType::from(StructType::new([
418 ("f1", DataType::Int32),
419 (
420 "map",
421 MapType::from_kv(
422 DataType::Varchar,
423 DataType::List(Box::new(
424 StructType::new([("f2", DataType::Int32), ("f3", DataType::Boolean)])
425 .into(),
426 )),
427 )
428 .into(),
429 ),
430 ]))
431 };
432
433 let new_type = || {
434 DataType::from(StructType::new([
435 ("f4", DataType::Int32),
436 (
437 "map",
438 MapType::from_kv(
439 DataType::Varchar,
440 DataType::List(Box::new(
441 StructType::new([
442 ("f5", DataType::Int32),
443 ("f3", DataType::Boolean),
444 ("f6", DataType::Float32),
445 ])
446 .into(),
447 )),
448 )
449 .into(),
450 ),
451 ]))
452 };
453
454 let incompatible_new_type = || {
455 DataType::from(StructType::new([(
456 "map",
457 MapType::from_kv(
458 DataType::Varchar,
459 DataType::List(Box::new(
460 StructType::new([("f6", DataType::Float64)]).into(),
461 )),
462 )
463 .into(),
464 )]))
465 };
466
467 let mut r#gen = ColumnIdGenerator::new_initial();
468 let mut col = ColumnCatalog {
469 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
470 "nested",
471 ori_type(),
472 )),
473 is_hidden: false,
474 };
475 r#gen.generate(&mut col).unwrap();
476 let version = r#gen.into_version();
477
478 expect![[r#"
479 ColumnCatalog {
480 column_desc: ColumnDesc {
481 data_type: Struct(
482 StructType {
483 fields: [
484 (
485 "f1",
486 Int32,
487 ),
488 (
489 "map",
490 Map(
491 MapType(
492 (
493 Varchar,
494 List(
495 Struct(
496 StructType {
497 fields: [
498 (
499 "f2",
500 Int32,
501 ),
502 (
503 "f3",
504 Boolean,
505 ),
506 ],
507 field_ids: [
508 #4,
509 #5,
510 ],
511 },
512 ),
513 ),
514 ),
515 ),
516 ),
517 ),
518 ],
519 field_ids: [
520 #2,
521 #3,
522 ],
523 },
524 ),
525 column_id: #1,
526 name: "nested",
527 generated_or_default_column: None,
528 description: None,
529 additional_column: AdditionalColumn {
530 column_type: None,
531 },
532 version: Pr13707,
533 system_column: None,
534 nullable: true,
535 },
536 is_hidden: false,
537 }
538 "#]]
539 .assert_debug_eq(&col);
540
541 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
542 columns: vec![col],
543 version: Some(version),
544 ..Default::default()
545 });
546 let mut new_col = ColumnCatalog {
547 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
548 "nested",
549 new_type(),
550 )),
551 is_hidden: false,
552 };
553 r#gen.generate(&mut new_col).unwrap();
554 let version = r#gen.into_version();
555
556 expect![[r#"
557 ColumnCatalog {
558 column_desc: ColumnDesc {
559 data_type: Struct(
560 StructType {
561 fields: [
562 (
563 "f4",
564 Int32,
565 ),
566 (
567 "map",
568 Map(
569 MapType(
570 (
571 Varchar,
572 List(
573 Struct(
574 StructType {
575 fields: [
576 (
577 "f5",
578 Int32,
579 ),
580 (
581 "f3",
582 Boolean,
583 ),
584 (
585 "f6",
586 Float32,
587 ),
588 ],
589 field_ids: [
590 #7,
591 #5,
592 #8,
593 ],
594 },
595 ),
596 ),
597 ),
598 ),
599 ),
600 ),
601 ],
602 field_ids: [
603 #6,
604 #3,
605 ],
606 },
607 ),
608 column_id: #1,
609 name: "nested",
610 generated_or_default_column: None,
611 description: None,
612 additional_column: AdditionalColumn {
613 column_type: None,
614 },
615 version: Pr13707,
616 system_column: None,
617 nullable: true,
618 },
619 is_hidden: false,
620 }
621 "#]]
622 .assert_debug_eq(&new_col);
623
624 let mut r#gen = ColumnIdGenerator::new_alter(&TableCatalog {
625 columns: vec![new_col],
626 version: Some(version),
627 ..Default::default()
628 });
629
630 let mut new_col = ColumnCatalog {
631 column_desc: ColumnDesc::from_field_without_column_id(&Field::new(
632 "nested",
633 incompatible_new_type(),
634 )),
635 is_hidden: false,
636 };
637 let err = r#gen.generate(&mut new_col).unwrap_err();
638 expect![[r#"incompatible data type change from Float32 to Float64 at path "nested.map.value.element.f6""#]].assert_eq(&err.to_report_string());
639 }
640}