1use std::borrow::Cow;
16use std::collections::HashMap;
17use std::ops::Deref;
18
19use itertools::Itertools;
20use risingwave_common::types::Datum;
21use risingwave_pb::expr::ExprNode;
22use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType};
23use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
24use risingwave_pb::plan_common::{
25 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc,
26};
27
28use super::schema::FieldLike;
29use super::{
30 CDC_OFFSET_COLUMN_NAME, CDC_TABLE_NAME_COLUMN_NAME, ICEBERG_FILE_PATH_COLUMN_NAME,
31 ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, ROW_ID_COLUMN_NAME,
32 RW_TIMESTAMP_COLUMN_ID, RW_TIMESTAMP_COLUMN_NAME, USER_COLUMN_ID_OFFSET,
33};
34use crate::catalog::{Field, ROW_ID_COLUMN_ID};
35use crate::types::DataType;
36use crate::util::value_encoding::DatumToProtoExt;
37
38#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
41pub struct ColumnId(i32);
42
43impl std::fmt::Debug for ColumnId {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(f, "#{}", self.0)
46 }
47}
48
49impl ColumnId {
50 pub const fn new(column_id: i32) -> Self {
51 Self(column_id)
52 }
53
54 pub const fn placeholder() -> Self {
56 Self(i32::MAX - 1)
57 }
58
59 pub const fn first_user_column() -> Self {
60 Self(USER_COLUMN_ID_OFFSET)
61 }
62}
63
64impl ColumnId {
65 pub const fn get_id(&self) -> i32 {
66 self.0
67 }
68
69 #[must_use]
71 pub const fn next(self) -> Self {
72 Self(self.0 + 1)
73 }
74
75 pub fn apply_delta_if_not_row_id(&mut self, delta: i32) {
76 if self.0 != ROW_ID_COLUMN_ID.get_id() {
77 self.0 += delta;
78 }
79 }
80}
81
82impl From<i32> for ColumnId {
83 fn from(column_id: i32) -> Self {
84 Self::new(column_id)
85 }
86}
87impl From<&i32> for ColumnId {
88 fn from(column_id: &i32) -> Self {
89 Self::new(*column_id)
90 }
91}
92
93impl From<ColumnId> for i32 {
94 fn from(id: ColumnId) -> i32 {
95 id.0
96 }
97}
98
99impl From<&ColumnId> for i32 {
100 fn from(id: &ColumnId) -> i32 {
101 id.0
102 }
103}
104
105impl std::fmt::Display for ColumnId {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 write!(f, "{}", self.0)
108 }
109}
110
111#[derive(Clone, Debug, PartialEq, Eq, Hash)]
112pub enum SystemColumn {
113 RwTimestamp,
114}
115
116#[derive(Clone, Debug, PartialEq, Eq, Hash)]
117pub struct ColumnDesc {
118 pub data_type: DataType,
119 pub column_id: ColumnId,
120 pub name: String,
121 pub generated_or_default_column: Option<GeneratedOrDefaultColumn>,
122 pub description: Option<String>,
123 pub additional_column: AdditionalColumn,
124 pub version: ColumnDescVersion,
125 pub system_column: Option<SystemColumn>,
128 pub nullable: bool,
132}
133
134impl AsRef<ColumnDesc> for ColumnDesc {
135 fn as_ref(&self) -> &ColumnDesc {
136 self
137 }
138}
139
140impl ColumnDesc {
141 pub fn unnamed(column_id: ColumnId, data_type: DataType) -> ColumnDesc {
142 Self::named("", column_id, data_type)
143 }
144
145 pub fn named(name: impl Into<String>, column_id: ColumnId, data_type: DataType) -> ColumnDesc {
146 ColumnDesc {
147 data_type,
148 column_id,
149 name: name.into(),
150 generated_or_default_column: None,
151 description: None,
152 additional_column: AdditionalColumn { column_type: None },
153 version: ColumnDescVersion::LATEST,
154 system_column: None,
155 nullable: true,
156 }
157 }
158
159 pub fn named_with_default_value(
160 name: impl Into<String>,
161 column_id: ColumnId,
162 data_type: DataType,
163 snapshot_value: Datum,
164 ) -> ColumnDesc {
165 let default_col = DefaultColumnDesc {
166 expr: Some(ExprNode {
167 function_type: ExprType::Unspecified as i32,
169 return_type: Some(data_type.to_protobuf()),
170 rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())),
171 }),
172 snapshot_value: Some(snapshot_value.to_protobuf()),
173 };
174 ColumnDesc {
175 generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_col)),
176 ..Self::named(name, column_id, data_type)
177 }
178 }
179
180 pub fn named_with_additional_column(
181 name: impl Into<String>,
182 column_id: ColumnId,
183 data_type: DataType,
184 additional_column_type: AdditionalColumn,
185 ) -> ColumnDesc {
186 ColumnDesc {
187 additional_column: additional_column_type,
188 ..Self::named(name, column_id, data_type)
189 }
190 }
191
192 pub fn named_with_system_column(
193 name: impl Into<String>,
194 column_id: ColumnId,
195 data_type: DataType,
196 system_column: SystemColumn,
197 ) -> ColumnDesc {
198 ColumnDesc {
199 system_column: Some(system_column),
200 ..Self::named(name, column_id, data_type)
201 }
202 }
203
204 pub fn to_protobuf(&self) -> PbColumnDesc {
206 PbColumnDesc {
207 column_type: Some(self.data_type.to_protobuf()),
208 column_id: self.column_id.get_id(),
209 name: self.name.clone(),
210 generated_or_default_column: self.generated_or_default_column.clone(),
211 description: self.description.clone(),
212 additional_column_type: 0, additional_column: Some(self.additional_column.clone()),
214 version: self.version as i32,
215 nullable: Some(self.nullable),
216 }
217 }
218
219 pub fn from_field_with_column_id(field: &Field, id: i32) -> Self {
220 Self::named(&field.name, ColumnId::new(id), field.data_type.clone())
221 }
222
223 pub fn from_field_without_column_id(field: &Field) -> Self {
224 Self::from_field_with_column_id(field, ColumnId::placeholder().into())
225 }
226
227 pub fn is_generated(&self) -> bool {
228 matches!(
229 self.generated_or_default_column,
230 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
231 )
232 }
233
234 pub fn get_id_to_op_idx_mapping(
235 columns: &[impl AsRef<Self>],
236 output_col_ids: Option<&[usize]>,
237 ) -> HashMap<ColumnId, usize> {
238 if let Some(output_col_ids) = output_col_ids {
239 Self::get_id_to_op_idx_mapping_inner(columns, output_col_ids.iter().cloned())
240 } else {
241 Self::get_id_to_op_idx_mapping_inner(columns, 0..columns.len())
242 }
243 }
244
245 fn get_id_to_op_idx_mapping_inner(
246 columns: &[impl AsRef<Self>],
247 output_col_ids: impl Iterator<Item = usize>,
248 ) -> HashMap<ColumnId, usize> {
249 let mut id_to_idx = HashMap::new();
250 output_col_ids.enumerate().for_each(|(idx, output_idx)| {
251 id_to_idx.insert(columns[output_idx].as_ref().column_id, idx);
252 });
253 id_to_idx
254 }
255}
256
257impl From<PbColumnDesc> for ColumnDesc {
258 fn from(prost: PbColumnDesc) -> Self {
259 let additional_column = prost
260 .get_additional_column()
261 .unwrap_or(&AdditionalColumn { column_type: None })
262 .clone();
263 let version = prost.version();
264
265 Self {
266 data_type: DataType::from(prost.column_type.as_ref().unwrap()),
267 column_id: ColumnId::new(prost.column_id),
268 name: prost.name,
269 generated_or_default_column: prost.generated_or_default_column,
270 description: prost.description.clone(),
271 additional_column,
272 version,
273 system_column: None,
274 nullable: prost.nullable.unwrap_or(true),
275 }
276 }
277}
278
279impl From<&PbColumnDesc> for ColumnDesc {
280 fn from(prost: &PbColumnDesc) -> Self {
281 prost.clone().into()
282 }
283}
284
285impl From<&ColumnDesc> for PbColumnDesc {
286 fn from(c: &ColumnDesc) -> Self {
287 c.to_protobuf()
288 }
289}
290
291#[derive(Debug, Clone, PartialEq, Eq, Hash)]
292pub struct ColumnCatalog {
293 pub column_desc: ColumnDesc,
294 pub is_hidden: bool,
295}
296
297impl Deref for ColumnCatalog {
298 type Target = ColumnDesc;
299
300 fn deref(&self) -> &Self::Target {
301 &self.column_desc
302 }
303}
304
305impl AsRef<ColumnDesc> for ColumnCatalog {
306 fn as_ref(&self) -> &ColumnDesc {
307 &self.column_desc
308 }
309}
310
311impl ColumnCatalog {
312 pub fn visible(column_desc: ColumnDesc) -> Self {
313 Self {
314 column_desc,
315 is_hidden: false,
316 }
317 }
318
319 pub fn hidden(column_desc: ColumnDesc) -> Self {
320 Self {
321 column_desc,
322 is_hidden: true,
323 }
324 }
325
326 pub fn is_hidden(&self) -> bool {
328 self.is_hidden
329 }
330
331 pub fn is_generated(&self) -> bool {
333 self.column_desc.is_generated()
334 }
335
336 pub fn can_dml(&self) -> bool {
337 !self.is_generated() && !self.is_rw_timestamp_column()
338 }
339
340 pub fn is_user_defined(&self) -> bool {
343 !self.is_hidden() && !self.is_rw_sys_column() && !self.is_connector_additional_column()
344 }
345
346 pub fn generated_expr(&self) -> Option<&ExprNode> {
348 if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
349 &self.column_desc.generated_or_default_column
350 {
351 Some(desc.expr.as_ref().unwrap())
352 } else {
353 None
354 }
355 }
356
357 pub fn is_connector_additional_column(&self) -> bool {
359 self.column_desc.additional_column.column_type.is_some()
360 }
361
362 pub fn data_type(&self) -> &DataType {
364 &self.column_desc.data_type
365 }
366
367 pub fn nullable(&self) -> bool {
369 self.column_desc.nullable
370 }
371
372 pub fn column_id(&self) -> ColumnId {
374 self.column_desc.column_id
375 }
376
377 pub fn name(&self) -> &str {
379 self.column_desc.name.as_ref()
380 }
381
382 pub fn to_protobuf(&self) -> PbColumnCatalog {
384 PbColumnCatalog {
385 column_desc: Some(self.column_desc.to_protobuf()),
386 is_hidden: self.is_hidden,
387 }
388 }
389
390 pub fn row_id_column() -> Self {
393 Self::hidden(ColumnDesc::named(
394 ROW_ID_COLUMN_NAME,
395 ROW_ID_COLUMN_ID,
396 DataType::Serial,
397 ))
398 }
399
400 pub fn is_rw_sys_column(&self) -> bool {
401 self.column_desc.system_column.is_some()
402 }
403
404 pub fn rw_timestamp_column() -> Self {
405 Self::hidden(ColumnDesc::named_with_system_column(
406 RW_TIMESTAMP_COLUMN_NAME,
407 RW_TIMESTAMP_COLUMN_ID,
408 DataType::Timestamptz,
409 SystemColumn::RwTimestamp,
410 ))
411 }
412
413 pub fn is_rw_timestamp_column(&self) -> bool {
414 matches!(
415 self.column_desc.system_column,
416 Some(SystemColumn::RwTimestamp)
417 )
418 }
419
420 pub fn iceberg_hidden_cols() -> [Self; 3] {
423 [
424 Self::hidden(ColumnDesc::named(
425 ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
426 ColumnId::placeholder(),
427 DataType::Int64,
428 )),
429 Self::hidden(ColumnDesc::named(
430 ICEBERG_FILE_PATH_COLUMN_NAME,
431 ColumnId::placeholder(),
432 DataType::Varchar,
433 )),
434 Self::hidden(ColumnDesc::named(
435 ICEBERG_FILE_POS_COLUMN_NAME,
436 ColumnId::placeholder(),
437 DataType::Int64,
438 )),
439 ]
440 }
441
442 pub fn debezium_cdc_source_cols() -> [Self; 3] {
445 [
446 Self::visible(ColumnDesc::named(
447 "payload",
448 ColumnId::placeholder(),
449 DataType::Jsonb,
450 )),
451 Self::hidden(ColumnDesc::named(
453 CDC_OFFSET_COLUMN_NAME,
454 ColumnId::placeholder(),
455 DataType::Varchar,
456 )),
457 Self::hidden(ColumnDesc::named(
459 CDC_TABLE_NAME_COLUMN_NAME,
460 ColumnId::placeholder(),
461 DataType::Varchar,
462 )),
463 ]
464 }
465
466 pub fn is_row_id_column(&self) -> bool {
467 self.column_desc.column_id == ROW_ID_COLUMN_ID
468 }
469}
470
471impl From<PbColumnCatalog> for ColumnCatalog {
472 fn from(prost: PbColumnCatalog) -> Self {
473 Self {
474 column_desc: prost.column_desc.unwrap().into(),
475 is_hidden: prost.is_hidden,
476 }
477 }
478}
479
480impl ColumnCatalog {
481 pub fn name_with_hidden(&self) -> Cow<'_, str> {
482 if self.is_hidden {
483 Cow::Owned(format!("{}(hidden)", self.column_desc.name))
484 } else {
485 Cow::Borrowed(&self.column_desc.name)
486 }
487 }
488}
489
490impl FieldLike for ColumnDesc {
491 fn data_type(&self) -> &DataType {
492 &self.data_type
493 }
494
495 fn name(&self) -> &str {
496 &self.name
497 }
498}
499
500impl FieldLike for ColumnCatalog {
501 fn data_type(&self) -> &DataType {
502 &self.column_desc.data_type
503 }
504
505 fn name(&self) -> &str {
506 &self.column_desc.name
507 }
508}
509
510pub fn columns_extend(preserved_columns: &mut Vec<ColumnCatalog>, columns: Vec<ColumnCatalog>) {
511 debug_assert_eq!(ROW_ID_COLUMN_ID.get_id(), 0);
512 let mut max_incoming_column_id = ROW_ID_COLUMN_ID.get_id();
513 columns.iter().for_each(|column| {
514 let column_id = column.column_id().get_id();
515 if column_id > max_incoming_column_id {
516 max_incoming_column_id = column_id;
517 }
518 });
519 preserved_columns.iter_mut().for_each(|column| {
520 column
521 .column_desc
522 .column_id
523 .apply_delta_if_not_row_id(max_incoming_column_id)
524 });
525
526 preserved_columns.extend(columns);
527}
528
529pub fn debug_assert_column_ids_distinct(columns: &[ColumnCatalog]) {
530 debug_assert!(
531 columns
532 .iter()
533 .map(|c| c.column_id())
534 .duplicates()
535 .next()
536 .is_none(),
537 "duplicate ColumnId found in source catalog. Columns: {columns:#?}"
538 );
539}
540
541pub fn max_column_id(columns: &[ColumnCatalog]) -> ColumnId {
549 columns
551 .iter()
552 .fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id()))
553}
554
555#[cfg(test)]
556mod tests {
557 use risingwave_pb::plan_common::PbColumnDesc;
558
559 use crate::catalog::ColumnDesc;
560 use crate::test_prelude::*;
561 use crate::types::{DataType, StructType};
562
563 pub fn build_prost_desc() -> PbColumnDesc {
564 let city = DataType::from(StructType::new([
565 ("country.city.address", DataType::Varchar),
566 ("country.city.zipcode", DataType::Varchar),
567 ]));
568 let country = DataType::from(StructType::new([
569 ("country.address", DataType::Varchar),
570 ("country.city", city),
571 ]));
572 PbColumnDesc::new(country.to_protobuf(), "country", 5)
573 }
574
575 pub fn build_desc() -> ColumnDesc {
576 let city = StructType::new([
577 ("country.city.address", DataType::Varchar),
578 ("country.city.zipcode", DataType::Varchar),
579 ]);
580 let country = StructType::new([
581 ("country.address", DataType::Varchar),
582 ("country.city", city.into()),
583 ]);
584 ColumnDesc::named("country", 5.into(), country.into())
585 }
586
587 #[test]
588 fn test_into_column_catalog() {
589 let desc: ColumnDesc = build_prost_desc().into();
590 assert_eq!(desc, build_desc());
591 }
592}