1use std::borrow::Cow;
16
17use itertools::Itertools;
18use risingwave_common::types::Datum;
19use risingwave_pb::expr::ExprNode;
20use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType};
21use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
22use risingwave_pb::plan_common::{
23 AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc,
24};
25
26use super::schema::FieldLike;
27use super::{
28 CDC_OFFSET_COLUMN_NAME, CDC_TABLE_NAME_COLUMN_NAME, ICEBERG_FILE_PATH_COLUMN_NAME,
29 ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, ROW_ID_COLUMN_NAME,
30 RW_TIMESTAMP_COLUMN_ID, RW_TIMESTAMP_COLUMN_NAME, USER_COLUMN_ID_OFFSET,
31};
32use crate::catalog::{Field, ROW_ID_COLUMN_ID};
33use crate::types::DataType;
34use crate::util::value_encoding::DatumToProtoExt;
35
36#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
39pub struct ColumnId(i32);
40
41impl std::fmt::Debug for ColumnId {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 write!(f, "#{}", self.0)
44 }
45}
46
47impl ColumnId {
48 pub const fn new(column_id: i32) -> Self {
49 Self(column_id)
50 }
51
52 pub const fn placeholder() -> Self {
54 Self(i32::MAX - 1)
55 }
56
57 pub const fn first_user_column() -> Self {
58 Self(USER_COLUMN_ID_OFFSET)
59 }
60}
61
62impl ColumnId {
63 pub const fn get_id(&self) -> i32 {
64 self.0
65 }
66
67 #[must_use]
69 pub const fn next(self) -> Self {
70 Self(self.0 + 1)
71 }
72
73 pub fn apply_delta_if_not_row_id(&mut self, delta: i32) {
74 if self.0 != ROW_ID_COLUMN_ID.get_id() {
75 self.0 += delta;
76 }
77 }
78}
79
80impl From<i32> for ColumnId {
81 fn from(column_id: i32) -> Self {
82 Self::new(column_id)
83 }
84}
85impl From<&i32> for ColumnId {
86 fn from(column_id: &i32) -> Self {
87 Self::new(*column_id)
88 }
89}
90
91impl From<ColumnId> for i32 {
92 fn from(id: ColumnId) -> i32 {
93 id.0
94 }
95}
96
97impl From<&ColumnId> for i32 {
98 fn from(id: &ColumnId) -> i32 {
99 id.0
100 }
101}
102
103impl std::fmt::Display for ColumnId {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 write!(f, "{}", self.0)
106 }
107}
108
109#[derive(Clone, Debug, PartialEq, Eq, Hash)]
110pub enum SystemColumn {
111 RwTimestamp,
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Hash)]
115pub struct ColumnDesc {
116 pub data_type: DataType,
117 pub column_id: ColumnId,
118 pub name: String,
119 pub generated_or_default_column: Option<GeneratedOrDefaultColumn>,
120 pub description: Option<String>,
121 pub additional_column: AdditionalColumn,
122 pub version: ColumnDescVersion,
123 pub system_column: Option<SystemColumn>,
126 pub nullable: bool,
130}
131
132impl ColumnDesc {
133 pub fn unnamed(column_id: ColumnId, data_type: DataType) -> ColumnDesc {
134 Self::named("", column_id, data_type)
135 }
136
137 pub fn named(name: impl Into<String>, column_id: ColumnId, data_type: DataType) -> ColumnDesc {
138 ColumnDesc {
139 data_type,
140 column_id,
141 name: name.into(),
142 generated_or_default_column: None,
143 description: None,
144 additional_column: AdditionalColumn { column_type: None },
145 version: ColumnDescVersion::LATEST,
146 system_column: None,
147 nullable: true,
148 }
149 }
150
151 pub fn named_with_default_value(
152 name: impl Into<String>,
153 column_id: ColumnId,
154 data_type: DataType,
155 snapshot_value: Datum,
156 ) -> ColumnDesc {
157 let default_col = DefaultColumnDesc {
158 expr: Some(ExprNode {
159 function_type: ExprType::Unspecified as i32,
161 return_type: Some(data_type.to_protobuf()),
162 rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())),
163 }),
164 snapshot_value: Some(snapshot_value.to_protobuf()),
165 };
166 ColumnDesc {
167 generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_col)),
168 ..Self::named(name, column_id, data_type)
169 }
170 }
171
172 pub fn named_with_additional_column(
173 name: impl Into<String>,
174 column_id: ColumnId,
175 data_type: DataType,
176 additional_column_type: AdditionalColumn,
177 ) -> ColumnDesc {
178 ColumnDesc {
179 additional_column: additional_column_type,
180 ..Self::named(name, column_id, data_type)
181 }
182 }
183
184 pub fn named_with_system_column(
185 name: impl Into<String>,
186 column_id: ColumnId,
187 data_type: DataType,
188 system_column: SystemColumn,
189 ) -> ColumnDesc {
190 ColumnDesc {
191 system_column: Some(system_column),
192 ..Self::named(name, column_id, data_type)
193 }
194 }
195
196 pub fn to_protobuf(&self) -> PbColumnDesc {
198 PbColumnDesc {
199 column_type: Some(self.data_type.to_protobuf()),
200 column_id: self.column_id.get_id(),
201 name: self.name.clone(),
202 generated_or_default_column: self.generated_or_default_column.clone(),
203 description: self.description.clone(),
204 additional_column_type: 0, additional_column: Some(self.additional_column.clone()),
206 version: self.version as i32,
207 nullable: self.nullable,
208 }
209 }
210
211 pub fn from_field_with_column_id(field: &Field, id: i32) -> Self {
212 Self::named(&field.name, ColumnId::new(id), field.data_type.clone())
213 }
214
215 pub fn from_field_without_column_id(field: &Field) -> Self {
216 Self::from_field_with_column_id(field, ColumnId::placeholder().into())
217 }
218
219 pub fn is_generated(&self) -> bool {
220 matches!(
221 self.generated_or_default_column,
222 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
223 )
224 }
225}
226
227impl From<PbColumnDesc> for ColumnDesc {
228 fn from(prost: PbColumnDesc) -> Self {
229 let additional_column = prost
230 .get_additional_column()
231 .unwrap_or(&AdditionalColumn { column_type: None })
232 .clone();
233 let version = prost.version();
234
235 Self {
236 data_type: DataType::from(prost.column_type.as_ref().unwrap()),
237 column_id: ColumnId::new(prost.column_id),
238 name: prost.name,
239 generated_or_default_column: prost.generated_or_default_column,
240 description: prost.description.clone(),
241 additional_column,
242 version,
243 system_column: None,
244 nullable: prost.nullable,
245 }
246 }
247}
248
249impl From<&PbColumnDesc> for ColumnDesc {
250 fn from(prost: &PbColumnDesc) -> Self {
251 prost.clone().into()
252 }
253}
254
255impl From<&ColumnDesc> for PbColumnDesc {
256 fn from(c: &ColumnDesc) -> Self {
257 c.to_protobuf()
258 }
259}
260
261#[derive(Debug, Clone, PartialEq, Eq, Hash)]
262pub struct ColumnCatalog {
263 pub column_desc: ColumnDesc,
264 pub is_hidden: bool,
265}
266
267impl ColumnCatalog {
268 pub fn visible(column_desc: ColumnDesc) -> Self {
269 Self {
270 column_desc,
271 is_hidden: false,
272 }
273 }
274
275 pub fn hidden(column_desc: ColumnDesc) -> Self {
276 Self {
277 column_desc,
278 is_hidden: true,
279 }
280 }
281
282 pub fn is_hidden(&self) -> bool {
284 self.is_hidden
285 }
286
287 pub fn is_generated(&self) -> bool {
289 self.column_desc.is_generated()
290 }
291
292 pub fn can_dml(&self) -> bool {
293 !self.is_generated() && !self.is_rw_timestamp_column()
294 }
295
296 pub fn is_user_defined(&self) -> bool {
299 !self.is_hidden() && !self.is_rw_sys_column() && !self.is_connector_additional_column()
300 }
301
302 pub fn generated_expr(&self) -> Option<&ExprNode> {
304 if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
305 &self.column_desc.generated_or_default_column
306 {
307 Some(desc.expr.as_ref().unwrap())
308 } else {
309 None
310 }
311 }
312
313 pub fn is_connector_additional_column(&self) -> bool {
315 self.column_desc.additional_column.column_type.is_some()
316 }
317
318 pub fn data_type(&self) -> &DataType {
320 &self.column_desc.data_type
321 }
322
323 pub fn nullable(&self) -> bool {
325 self.column_desc.nullable
326 }
327
328 pub fn column_id(&self) -> ColumnId {
330 self.column_desc.column_id
331 }
332
333 pub fn name(&self) -> &str {
335 self.column_desc.name.as_ref()
336 }
337
338 pub fn to_protobuf(&self) -> PbColumnCatalog {
340 PbColumnCatalog {
341 column_desc: Some(self.column_desc.to_protobuf()),
342 is_hidden: self.is_hidden,
343 }
344 }
345
346 pub fn row_id_column() -> Self {
349 Self::hidden(ColumnDesc::named(
350 ROW_ID_COLUMN_NAME,
351 ROW_ID_COLUMN_ID,
352 DataType::Serial,
353 ))
354 }
355
356 pub fn is_rw_sys_column(&self) -> bool {
357 self.column_desc.system_column.is_some()
358 }
359
360 pub fn rw_timestamp_column() -> Self {
361 Self::hidden(ColumnDesc::named_with_system_column(
362 RW_TIMESTAMP_COLUMN_NAME,
363 RW_TIMESTAMP_COLUMN_ID,
364 DataType::Timestamptz,
365 SystemColumn::RwTimestamp,
366 ))
367 }
368
369 pub fn is_rw_timestamp_column(&self) -> bool {
370 matches!(
371 self.column_desc.system_column,
372 Some(SystemColumn::RwTimestamp)
373 )
374 }
375
376 pub fn iceberg_hidden_cols() -> [Self; 3] {
379 [
380 Self::hidden(ColumnDesc::named(
381 ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
382 ColumnId::placeholder(),
383 DataType::Int64,
384 )),
385 Self::hidden(ColumnDesc::named(
386 ICEBERG_FILE_PATH_COLUMN_NAME,
387 ColumnId::placeholder(),
388 DataType::Varchar,
389 )),
390 Self::hidden(ColumnDesc::named(
391 ICEBERG_FILE_POS_COLUMN_NAME,
392 ColumnId::placeholder(),
393 DataType::Int64,
394 )),
395 ]
396 }
397
398 pub fn debezium_cdc_source_cols() -> [Self; 3] {
401 [
402 Self::visible(ColumnDesc::named(
403 "payload",
404 ColumnId::placeholder(),
405 DataType::Jsonb,
406 )),
407 Self::hidden(ColumnDesc::named(
409 CDC_OFFSET_COLUMN_NAME,
410 ColumnId::placeholder(),
411 DataType::Varchar,
412 )),
413 Self::hidden(ColumnDesc::named(
415 CDC_TABLE_NAME_COLUMN_NAME,
416 ColumnId::placeholder(),
417 DataType::Varchar,
418 )),
419 ]
420 }
421
422 pub fn is_row_id_column(&self) -> bool {
423 self.column_desc.column_id == ROW_ID_COLUMN_ID
424 }
425}
426
427impl From<PbColumnCatalog> for ColumnCatalog {
428 fn from(prost: PbColumnCatalog) -> Self {
429 Self {
430 column_desc: prost.column_desc.unwrap().into(),
431 is_hidden: prost.is_hidden,
432 }
433 }
434}
435
436impl ColumnCatalog {
437 pub fn name_with_hidden(&self) -> Cow<'_, str> {
438 if self.is_hidden {
439 Cow::Owned(format!("{}(hidden)", self.column_desc.name))
440 } else {
441 Cow::Borrowed(&self.column_desc.name)
442 }
443 }
444}
445
446impl FieldLike for ColumnDesc {
447 fn data_type(&self) -> &DataType {
448 &self.data_type
449 }
450
451 fn name(&self) -> &str {
452 &self.name
453 }
454}
455
456impl FieldLike for ColumnCatalog {
457 fn data_type(&self) -> &DataType {
458 &self.column_desc.data_type
459 }
460
461 fn name(&self) -> &str {
462 &self.column_desc.name
463 }
464}
465
466pub fn columns_extend(preserved_columns: &mut Vec<ColumnCatalog>, columns: Vec<ColumnCatalog>) {
467 debug_assert_eq!(ROW_ID_COLUMN_ID.get_id(), 0);
468 let mut max_incoming_column_id = ROW_ID_COLUMN_ID.get_id();
469 columns.iter().for_each(|column| {
470 let column_id = column.column_id().get_id();
471 if column_id > max_incoming_column_id {
472 max_incoming_column_id = column_id;
473 }
474 });
475 preserved_columns.iter_mut().for_each(|column| {
476 column
477 .column_desc
478 .column_id
479 .apply_delta_if_not_row_id(max_incoming_column_id)
480 });
481
482 preserved_columns.extend(columns);
483}
484
485pub fn debug_assert_column_ids_distinct(columns: &[ColumnCatalog]) {
486 debug_assert!(
487 columns
488 .iter()
489 .map(|c| c.column_id())
490 .duplicates()
491 .next()
492 .is_none(),
493 "duplicate ColumnId found in source catalog. Columns: {columns:#?}"
494 );
495}
496
497pub fn max_column_id(columns: &[ColumnCatalog]) -> ColumnId {
505 columns
507 .iter()
508 .fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id()))
509}
510
511#[cfg(test)]
512pub mod tests {
513 use risingwave_pb::plan_common::PbColumnDesc;
514
515 use crate::catalog::ColumnDesc;
516 use crate::test_prelude::*;
517 use crate::types::{DataType, StructType};
518
519 pub fn build_prost_desc() -> PbColumnDesc {
520 let city = DataType::from(StructType::new([
521 ("country.city.address", DataType::Varchar),
522 ("country.city.zipcode", DataType::Varchar),
523 ]));
524 let country = DataType::from(StructType::new([
525 ("country.address", DataType::Varchar),
526 ("country.city", city),
527 ]));
528 PbColumnDesc::new(country.to_protobuf(), "country", 5)
529 }
530
531 pub fn build_desc() -> ColumnDesc {
532 let city = StructType::new([
533 ("country.city.address", DataType::Varchar),
534 ("country.city.zipcode", DataType::Varchar),
535 ]);
536 let country = StructType::new([
537 ("country.address", DataType::Varchar),
538 ("country.city", city.into()),
539 ]);
540 ColumnDesc::named("country", 5.into(), country.into())
541 }
542
543 #[test]
544 fn test_into_column_catalog() {
545 let desc: ColumnDesc = build_prost_desc().into();
546 assert_eq!(desc, build_desc());
547 }
548}