risingwave_common/catalog/
column.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::collections::HashMap;
17use std::ops::Deref;
18
19use itertools::Itertools;
20use risingwave_common::types::Datum;
21use risingwave_pb::expr::ExprNode;
22use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType};
23use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
24use risingwave_pb::plan_common::{
25    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc,
26};
27
28use super::schema::FieldLike;
29use super::{
30    CDC_OFFSET_COLUMN_NAME, CDC_TABLE_NAME_COLUMN_NAME, ICEBERG_FILE_PATH_COLUMN_NAME,
31    ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, ROW_ID_COLUMN_NAME,
32    RW_TIMESTAMP_COLUMN_ID, RW_TIMESTAMP_COLUMN_NAME, USER_COLUMN_ID_OFFSET,
33};
34use crate::catalog::{Field, ROW_ID_COLUMN_ID};
35use crate::types::DataType;
36use crate::util::value_encoding::DatumToProtoExt;
37
38/// Column ID is the unique identifier of a column in a table. Different from table ID, column ID is
39/// not globally unique.
40#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
41pub struct ColumnId(i32);
42
43impl std::fmt::Debug for ColumnId {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        write!(f, "#{}", self.0)
46    }
47}
48
49impl ColumnId {
50    pub const fn new(column_id: i32) -> Self {
51        Self(column_id)
52    }
53
54    /// Sometimes the id field is filled later, we use this value for better debugging.
55    pub const fn placeholder() -> Self {
56        Self(i32::MAX - 1)
57    }
58
59    pub const fn first_user_column() -> Self {
60        Self(USER_COLUMN_ID_OFFSET)
61    }
62}
63
64impl ColumnId {
65    pub const fn get_id(&self) -> i32 {
66        self.0
67    }
68
69    /// Returns the subsequent column id.
70    #[must_use]
71    pub const fn next(self) -> Self {
72        Self(self.0 + 1)
73    }
74
75    pub fn apply_delta_if_not_row_id(&mut self, delta: i32) {
76        if self.0 != ROW_ID_COLUMN_ID.get_id() {
77            self.0 += delta;
78        }
79    }
80}
81
82impl From<i32> for ColumnId {
83    fn from(column_id: i32) -> Self {
84        Self::new(column_id)
85    }
86}
87impl From<&i32> for ColumnId {
88    fn from(column_id: &i32) -> Self {
89        Self::new(*column_id)
90    }
91}
92
93impl From<ColumnId> for i32 {
94    fn from(id: ColumnId) -> i32 {
95        id.0
96    }
97}
98
99impl From<&ColumnId> for i32 {
100    fn from(id: &ColumnId) -> i32 {
101        id.0
102    }
103}
104
105impl std::fmt::Display for ColumnId {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        write!(f, "{}", self.0)
108    }
109}
110
111#[derive(Clone, Debug, PartialEq, Eq, Hash)]
112pub enum SystemColumn {
113    RwTimestamp,
114}
115
116#[derive(Clone, Debug, PartialEq, Eq, Hash)]
117pub struct ColumnDesc {
118    pub data_type: DataType,
119    pub column_id: ColumnId,
120    pub name: String,
121    pub generated_or_default_column: Option<GeneratedOrDefaultColumn>,
122    pub description: Option<String>,
123    pub additional_column: AdditionalColumn,
124    pub version: ColumnDescVersion,
125    /// Currently the system column is used for `_rw_timestamp` only and is generated at runtime,
126    /// so this field is not persisted.
127    pub system_column: Option<SystemColumn>,
128    /// Whether the column is nullable.
129    /// If a column is not nullable, BatchInsert/BatchUpdate operations will throw an error when NULL is inserted/updated into.
130    /// The row contains NULL for this column will be ignored when streaming data into the table.
131    pub nullable: bool,
132}
133
134impl AsRef<ColumnDesc> for ColumnDesc {
135    fn as_ref(&self) -> &ColumnDesc {
136        self
137    }
138}
139
140impl ColumnDesc {
141    pub fn unnamed(column_id: ColumnId, data_type: DataType) -> ColumnDesc {
142        Self::named("", column_id, data_type)
143    }
144
145    pub fn named(name: impl Into<String>, column_id: ColumnId, data_type: DataType) -> ColumnDesc {
146        ColumnDesc {
147            data_type,
148            column_id,
149            name: name.into(),
150            generated_or_default_column: None,
151            description: None,
152            additional_column: AdditionalColumn { column_type: None },
153            version: ColumnDescVersion::LATEST,
154            system_column: None,
155            nullable: true,
156        }
157    }
158
159    pub fn named_with_default_value(
160        name: impl Into<String>,
161        column_id: ColumnId,
162        data_type: DataType,
163        snapshot_value: Datum,
164    ) -> ColumnDesc {
165        let default_col = DefaultColumnDesc {
166            expr: Some(ExprNode {
167                // equivalent to `Literal::to_expr_proto`
168                function_type: ExprType::Unspecified as i32,
169                return_type: Some(data_type.to_protobuf()),
170                rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())),
171            }),
172            snapshot_value: Some(snapshot_value.to_protobuf()),
173        };
174        ColumnDesc {
175            generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_col)),
176            ..Self::named(name, column_id, data_type)
177        }
178    }
179
180    pub fn named_with_additional_column(
181        name: impl Into<String>,
182        column_id: ColumnId,
183        data_type: DataType,
184        additional_column_type: AdditionalColumn,
185    ) -> ColumnDesc {
186        ColumnDesc {
187            additional_column: additional_column_type,
188            ..Self::named(name, column_id, data_type)
189        }
190    }
191
192    pub fn named_with_system_column(
193        name: impl Into<String>,
194        column_id: ColumnId,
195        data_type: DataType,
196        system_column: SystemColumn,
197    ) -> ColumnDesc {
198        ColumnDesc {
199            system_column: Some(system_column),
200            ..Self::named(name, column_id, data_type)
201        }
202    }
203
204    /// Convert to proto
205    pub fn to_protobuf(&self) -> PbColumnDesc {
206        PbColumnDesc {
207            column_type: Some(self.data_type.to_protobuf()),
208            column_id: self.column_id.get_id(),
209            name: self.name.clone(),
210            generated_or_default_column: self.generated_or_default_column.clone(),
211            description: self.description.clone(),
212            additional_column_type: 0, // deprecated
213            additional_column: Some(self.additional_column.clone()),
214            version: self.version as i32,
215            nullable: Some(self.nullable),
216        }
217    }
218
219    pub fn from_field_with_column_id(field: &Field, id: i32) -> Self {
220        Self::named(&field.name, ColumnId::new(id), field.data_type.clone())
221    }
222
223    pub fn from_field_without_column_id(field: &Field) -> Self {
224        Self::from_field_with_column_id(field, ColumnId::placeholder().into())
225    }
226
227    pub fn is_generated(&self) -> bool {
228        matches!(
229            self.generated_or_default_column,
230            Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
231        )
232    }
233
234    pub fn get_id_to_op_idx_mapping(
235        columns: &[impl AsRef<Self>],
236        output_col_ids: Option<&[usize]>,
237    ) -> HashMap<ColumnId, usize> {
238        if let Some(output_col_ids) = output_col_ids {
239            Self::get_id_to_op_idx_mapping_inner(columns, output_col_ids.iter().cloned())
240        } else {
241            Self::get_id_to_op_idx_mapping_inner(columns, 0..columns.len())
242        }
243    }
244
245    fn get_id_to_op_idx_mapping_inner(
246        columns: &[impl AsRef<Self>],
247        output_col_ids: impl Iterator<Item = usize>,
248    ) -> HashMap<ColumnId, usize> {
249        let mut id_to_idx = HashMap::new();
250        output_col_ids.enumerate().for_each(|(idx, output_idx)| {
251            id_to_idx.insert(columns[output_idx].as_ref().column_id, idx);
252        });
253        id_to_idx
254    }
255}
256
257impl From<PbColumnDesc> for ColumnDesc {
258    fn from(prost: PbColumnDesc) -> Self {
259        let additional_column = prost
260            .get_additional_column()
261            .unwrap_or(&AdditionalColumn { column_type: None })
262            .clone();
263        let version = prost.version();
264
265        Self {
266            data_type: DataType::from(prost.column_type.as_ref().unwrap()),
267            column_id: ColumnId::new(prost.column_id),
268            name: prost.name,
269            generated_or_default_column: prost.generated_or_default_column,
270            description: prost.description.clone(),
271            additional_column,
272            version,
273            system_column: None,
274            nullable: prost.nullable.unwrap_or(true),
275        }
276    }
277}
278
279impl From<&PbColumnDesc> for ColumnDesc {
280    fn from(prost: &PbColumnDesc) -> Self {
281        prost.clone().into()
282    }
283}
284
285impl From<&ColumnDesc> for PbColumnDesc {
286    fn from(c: &ColumnDesc) -> Self {
287        c.to_protobuf()
288    }
289}
290
291#[derive(Debug, Clone, PartialEq, Eq, Hash)]
292pub struct ColumnCatalog {
293    pub column_desc: ColumnDesc,
294    pub is_hidden: bool,
295}
296
297impl Deref for ColumnCatalog {
298    type Target = ColumnDesc;
299
300    fn deref(&self) -> &Self::Target {
301        &self.column_desc
302    }
303}
304
305impl AsRef<ColumnDesc> for ColumnCatalog {
306    fn as_ref(&self) -> &ColumnDesc {
307        &self.column_desc
308    }
309}
310
311impl ColumnCatalog {
312    pub fn visible(column_desc: ColumnDesc) -> Self {
313        Self {
314            column_desc,
315            is_hidden: false,
316        }
317    }
318
319    pub fn hidden(column_desc: ColumnDesc) -> Self {
320        Self {
321            column_desc,
322            is_hidden: true,
323        }
324    }
325
326    /// Get the column catalog's is hidden.
327    pub fn is_hidden(&self) -> bool {
328        self.is_hidden
329    }
330
331    /// If the column is a generated column
332    pub fn is_generated(&self) -> bool {
333        self.column_desc.is_generated()
334    }
335
336    pub fn can_dml(&self) -> bool {
337        !self.is_generated() && !self.is_rw_timestamp_column()
338    }
339
340    /// Returns whether the column is defined by user within the column definition clause
341    /// in the `CREATE TABLE` statement.
342    pub fn is_user_defined(&self) -> bool {
343        !self.is_hidden() && !self.is_rw_sys_column() && !self.is_connector_additional_column()
344    }
345
346    /// If the column is a generated column
347    pub fn generated_expr(&self) -> Option<&ExprNode> {
348        if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
349            &self.column_desc.generated_or_default_column
350        {
351            Some(desc.expr.as_ref().unwrap())
352        } else {
353            None
354        }
355    }
356
357    /// If the columns is an `INCLUDE ... AS ...` connector column.
358    pub fn is_connector_additional_column(&self) -> bool {
359        self.column_desc.additional_column.column_type.is_some()
360    }
361
362    /// Get a reference to the column desc's data type.
363    pub fn data_type(&self) -> &DataType {
364        &self.column_desc.data_type
365    }
366
367    /// Get nullable info of the column.
368    pub fn nullable(&self) -> bool {
369        self.column_desc.nullable
370    }
371
372    /// Get the column desc's column id.
373    pub fn column_id(&self) -> ColumnId {
374        self.column_desc.column_id
375    }
376
377    /// Get a reference to the column desc's name.
378    pub fn name(&self) -> &str {
379        self.column_desc.name.as_ref()
380    }
381
382    /// Convert column catalog to proto
383    pub fn to_protobuf(&self) -> PbColumnCatalog {
384        PbColumnCatalog {
385            column_desc: Some(self.column_desc.to_protobuf()),
386            is_hidden: self.is_hidden,
387        }
388    }
389
390    /// Creates a row ID column (for implicit primary key).
391    /// It'll always have the ID `0`.
392    pub fn row_id_column() -> Self {
393        Self::hidden(ColumnDesc::named(
394            ROW_ID_COLUMN_NAME,
395            ROW_ID_COLUMN_ID,
396            DataType::Serial,
397        ))
398    }
399
400    pub fn is_rw_sys_column(&self) -> bool {
401        self.column_desc.system_column.is_some()
402    }
403
404    pub fn rw_timestamp_column() -> Self {
405        Self::hidden(ColumnDesc::named_with_system_column(
406            RW_TIMESTAMP_COLUMN_NAME,
407            RW_TIMESTAMP_COLUMN_ID,
408            DataType::Timestamptz,
409            SystemColumn::RwTimestamp,
410        ))
411    }
412
413    pub fn is_rw_timestamp_column(&self) -> bool {
414        matches!(
415            self.column_desc.system_column,
416            Some(SystemColumn::RwTimestamp)
417        )
418    }
419
420    // XXX: should we use INCLUDE columns or SYSTEM columns instead of normal hidden columns?
421
422    pub fn iceberg_hidden_cols() -> [Self; 3] {
423        [
424            Self::hidden(ColumnDesc::named(
425                ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
426                ColumnId::placeholder(),
427                DataType::Int64,
428            )),
429            Self::hidden(ColumnDesc::named(
430                ICEBERG_FILE_PATH_COLUMN_NAME,
431                ColumnId::placeholder(),
432                DataType::Varchar,
433            )),
434            Self::hidden(ColumnDesc::named(
435                ICEBERG_FILE_POS_COLUMN_NAME,
436                ColumnId::placeholder(),
437                DataType::Int64,
438            )),
439        ]
440    }
441
442    /// Note: these columns are added in `SourceStreamChunkRowWriter::do_action`.
443    /// May also look for the usage of `SourceColumnType`.
444    pub fn debezium_cdc_source_cols() -> [Self; 3] {
445        [
446            Self::visible(ColumnDesc::named(
447                "payload",
448                ColumnId::placeholder(),
449                DataType::Jsonb,
450            )),
451            // upstream offset
452            Self::hidden(ColumnDesc::named(
453                CDC_OFFSET_COLUMN_NAME,
454                ColumnId::placeholder(),
455                DataType::Varchar,
456            )),
457            // upstream table name of the cdc table
458            Self::hidden(ColumnDesc::named(
459                CDC_TABLE_NAME_COLUMN_NAME,
460                ColumnId::placeholder(),
461                DataType::Varchar,
462            )),
463        ]
464    }
465
466    pub fn is_row_id_column(&self) -> bool {
467        self.column_desc.column_id == ROW_ID_COLUMN_ID
468    }
469}
470
471impl From<PbColumnCatalog> for ColumnCatalog {
472    fn from(prost: PbColumnCatalog) -> Self {
473        Self {
474            column_desc: prost.column_desc.unwrap().into(),
475            is_hidden: prost.is_hidden,
476        }
477    }
478}
479
480impl ColumnCatalog {
481    pub fn name_with_hidden(&self) -> Cow<'_, str> {
482        if self.is_hidden {
483            Cow::Owned(format!("{}(hidden)", self.column_desc.name))
484        } else {
485            Cow::Borrowed(&self.column_desc.name)
486        }
487    }
488}
489
490impl FieldLike for ColumnDesc {
491    fn data_type(&self) -> &DataType {
492        &self.data_type
493    }
494
495    fn name(&self) -> &str {
496        &self.name
497    }
498}
499
500impl FieldLike for ColumnCatalog {
501    fn data_type(&self) -> &DataType {
502        &self.column_desc.data_type
503    }
504
505    fn name(&self) -> &str {
506        &self.column_desc.name
507    }
508}
509
510pub fn columns_extend(preserved_columns: &mut Vec<ColumnCatalog>, columns: Vec<ColumnCatalog>) {
511    debug_assert_eq!(ROW_ID_COLUMN_ID.get_id(), 0);
512    let mut max_incoming_column_id = ROW_ID_COLUMN_ID.get_id();
513    columns.iter().for_each(|column| {
514        let column_id = column.column_id().get_id();
515        if column_id > max_incoming_column_id {
516            max_incoming_column_id = column_id;
517        }
518    });
519    preserved_columns.iter_mut().for_each(|column| {
520        column
521            .column_desc
522            .column_id
523            .apply_delta_if_not_row_id(max_incoming_column_id)
524    });
525
526    preserved_columns.extend(columns);
527}
528
529pub fn debug_assert_column_ids_distinct(columns: &[ColumnCatalog]) {
530    debug_assert!(
531        columns
532            .iter()
533            .map(|c| c.column_id())
534            .duplicates()
535            .next()
536            .is_none(),
537        "duplicate ColumnId found in source catalog. Columns: {columns:#?}"
538    );
539}
540
541/// FIXME: Perhaps we should use sth like `ColumnIdGenerator::new_alter`,
542/// However, the `SourceVersion` is problematic: It doesn't contain `next_col_id`.
543/// (But for now this isn't a large problem, since drop column is not allowed for source yet..)
544///
545/// Besides, the logic of column id handling is a mess.
546/// In some places, we use `ColumnId::placeholder()`, and use `col_id_gen` to fill it at the end;
547/// In other places, we create column id ad-hoc.
548pub fn max_column_id(columns: &[ColumnCatalog]) -> ColumnId {
549    // XXX: should we check the column IDs of struct fields here?
550    columns
551        .iter()
552        .fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id()))
553}
554
555#[cfg(test)]
556mod tests {
557    use risingwave_pb::plan_common::PbColumnDesc;
558
559    use crate::catalog::ColumnDesc;
560    use crate::test_prelude::*;
561    use crate::types::{DataType, StructType};
562
563    pub fn build_prost_desc() -> PbColumnDesc {
564        let city = DataType::from(StructType::new([
565            ("country.city.address", DataType::Varchar),
566            ("country.city.zipcode", DataType::Varchar),
567        ]));
568        let country = DataType::from(StructType::new([
569            ("country.address", DataType::Varchar),
570            ("country.city", city),
571        ]));
572        PbColumnDesc::new(country.to_protobuf(), "country", 5)
573    }
574
575    pub fn build_desc() -> ColumnDesc {
576        let city = StructType::new([
577            ("country.city.address", DataType::Varchar),
578            ("country.city.zipcode", DataType::Varchar),
579        ]);
580        let country = StructType::new([
581            ("country.address", DataType::Varchar),
582            ("country.city", city.into()),
583        ]);
584        ColumnDesc::named("country", 5.into(), country.into())
585    }
586
587    #[test]
588    fn test_into_column_catalog() {
589        let desc: ColumnDesc = build_prost_desc().into();
590        assert_eq!(desc, build_desc());
591    }
592}