risingwave_common/catalog/
column.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16
17use itertools::Itertools;
18use risingwave_common::types::Datum;
19use risingwave_pb::expr::ExprNode;
20use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType};
21use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
22use risingwave_pb::plan_common::{
23    AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc,
24};
25
26use super::schema::FieldLike;
27use super::{
28    CDC_OFFSET_COLUMN_NAME, CDC_TABLE_NAME_COLUMN_NAME, ICEBERG_FILE_PATH_COLUMN_NAME,
29    ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, ROW_ID_COLUMN_NAME,
30    RW_TIMESTAMP_COLUMN_ID, RW_TIMESTAMP_COLUMN_NAME, USER_COLUMN_ID_OFFSET,
31};
32use crate::catalog::{Field, ROW_ID_COLUMN_ID};
33use crate::types::DataType;
34use crate::util::value_encoding::DatumToProtoExt;
35
36/// Column ID is the unique identifier of a column in a table. Different from table ID, column ID is
37/// not globally unique.
38#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
39pub struct ColumnId(i32);
40
41impl std::fmt::Debug for ColumnId {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        write!(f, "#{}", self.0)
44    }
45}
46
47impl ColumnId {
48    pub const fn new(column_id: i32) -> Self {
49        Self(column_id)
50    }
51
52    /// Sometimes the id field is filled later, we use this value for better debugging.
53    pub const fn placeholder() -> Self {
54        Self(i32::MAX - 1)
55    }
56
57    pub const fn first_user_column() -> Self {
58        Self(USER_COLUMN_ID_OFFSET)
59    }
60}
61
62impl ColumnId {
63    pub const fn get_id(&self) -> i32 {
64        self.0
65    }
66
67    /// Returns the subsequent column id.
68    #[must_use]
69    pub const fn next(self) -> Self {
70        Self(self.0 + 1)
71    }
72
73    pub fn apply_delta_if_not_row_id(&mut self, delta: i32) {
74        if self.0 != ROW_ID_COLUMN_ID.get_id() {
75            self.0 += delta;
76        }
77    }
78}
79
80impl From<i32> for ColumnId {
81    fn from(column_id: i32) -> Self {
82        Self::new(column_id)
83    }
84}
85impl From<&i32> for ColumnId {
86    fn from(column_id: &i32) -> Self {
87        Self::new(*column_id)
88    }
89}
90
91impl From<ColumnId> for i32 {
92    fn from(id: ColumnId) -> i32 {
93        id.0
94    }
95}
96
97impl From<&ColumnId> for i32 {
98    fn from(id: &ColumnId) -> i32 {
99        id.0
100    }
101}
102
103impl std::fmt::Display for ColumnId {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        write!(f, "{}", self.0)
106    }
107}
108
109#[derive(Clone, Debug, PartialEq, Eq, Hash)]
110pub enum SystemColumn {
111    RwTimestamp,
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Hash)]
115pub struct ColumnDesc {
116    pub data_type: DataType,
117    pub column_id: ColumnId,
118    pub name: String,
119    pub generated_or_default_column: Option<GeneratedOrDefaultColumn>,
120    pub description: Option<String>,
121    pub additional_column: AdditionalColumn,
122    pub version: ColumnDescVersion,
123    /// Currently the system column is used for `_rw_timestamp` only and is generated at runtime,
124    /// so this field is not persisted.
125    pub system_column: Option<SystemColumn>,
126    /// Whether the column is nullable.
127    /// If a column is not nullable, BatchInsert/BatchUpdate operations will throw an error when NULL is inserted/updated into.
128    /// The row contains NULL for this column will be ignored when streaming data into the table.
129    pub nullable: bool,
130}
131
132impl ColumnDesc {
133    pub fn unnamed(column_id: ColumnId, data_type: DataType) -> ColumnDesc {
134        Self::named("", column_id, data_type)
135    }
136
137    pub fn named(name: impl Into<String>, column_id: ColumnId, data_type: DataType) -> ColumnDesc {
138        ColumnDesc {
139            data_type,
140            column_id,
141            name: name.into(),
142            generated_or_default_column: None,
143            description: None,
144            additional_column: AdditionalColumn { column_type: None },
145            version: ColumnDescVersion::LATEST,
146            system_column: None,
147            nullable: true,
148        }
149    }
150
151    pub fn named_with_default_value(
152        name: impl Into<String>,
153        column_id: ColumnId,
154        data_type: DataType,
155        snapshot_value: Datum,
156    ) -> ColumnDesc {
157        let default_col = DefaultColumnDesc {
158            expr: Some(ExprNode {
159                // equivalent to `Literal::to_expr_proto`
160                function_type: ExprType::Unspecified as i32,
161                return_type: Some(data_type.to_protobuf()),
162                rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())),
163            }),
164            snapshot_value: Some(snapshot_value.to_protobuf()),
165        };
166        ColumnDesc {
167            generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_col)),
168            ..Self::named(name, column_id, data_type)
169        }
170    }
171
172    pub fn named_with_additional_column(
173        name: impl Into<String>,
174        column_id: ColumnId,
175        data_type: DataType,
176        additional_column_type: AdditionalColumn,
177    ) -> ColumnDesc {
178        ColumnDesc {
179            additional_column: additional_column_type,
180            ..Self::named(name, column_id, data_type)
181        }
182    }
183
184    pub fn named_with_system_column(
185        name: impl Into<String>,
186        column_id: ColumnId,
187        data_type: DataType,
188        system_column: SystemColumn,
189    ) -> ColumnDesc {
190        ColumnDesc {
191            system_column: Some(system_column),
192            ..Self::named(name, column_id, data_type)
193        }
194    }
195
196    /// Convert to proto
197    pub fn to_protobuf(&self) -> PbColumnDesc {
198        PbColumnDesc {
199            column_type: Some(self.data_type.to_protobuf()),
200            column_id: self.column_id.get_id(),
201            name: self.name.clone(),
202            generated_or_default_column: self.generated_or_default_column.clone(),
203            description: self.description.clone(),
204            additional_column_type: 0, // deprecated
205            additional_column: Some(self.additional_column.clone()),
206            version: self.version as i32,
207            nullable: self.nullable,
208        }
209    }
210
211    pub fn from_field_with_column_id(field: &Field, id: i32) -> Self {
212        Self::named(&field.name, ColumnId::new(id), field.data_type.clone())
213    }
214
215    pub fn from_field_without_column_id(field: &Field) -> Self {
216        Self::from_field_with_column_id(field, ColumnId::placeholder().into())
217    }
218
219    pub fn is_generated(&self) -> bool {
220        matches!(
221            self.generated_or_default_column,
222            Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
223        )
224    }
225}
226
227impl From<PbColumnDesc> for ColumnDesc {
228    fn from(prost: PbColumnDesc) -> Self {
229        let additional_column = prost
230            .get_additional_column()
231            .unwrap_or(&AdditionalColumn { column_type: None })
232            .clone();
233        let version = prost.version();
234
235        Self {
236            data_type: DataType::from(prost.column_type.as_ref().unwrap()),
237            column_id: ColumnId::new(prost.column_id),
238            name: prost.name,
239            generated_or_default_column: prost.generated_or_default_column,
240            description: prost.description.clone(),
241            additional_column,
242            version,
243            system_column: None,
244            nullable: prost.nullable,
245        }
246    }
247}
248
249impl From<&PbColumnDesc> for ColumnDesc {
250    fn from(prost: &PbColumnDesc) -> Self {
251        prost.clone().into()
252    }
253}
254
255impl From<&ColumnDesc> for PbColumnDesc {
256    fn from(c: &ColumnDesc) -> Self {
257        c.to_protobuf()
258    }
259}
260
261#[derive(Debug, Clone, PartialEq, Eq, Hash)]
262pub struct ColumnCatalog {
263    pub column_desc: ColumnDesc,
264    pub is_hidden: bool,
265}
266
267impl ColumnCatalog {
268    pub fn visible(column_desc: ColumnDesc) -> Self {
269        Self {
270            column_desc,
271            is_hidden: false,
272        }
273    }
274
275    pub fn hidden(column_desc: ColumnDesc) -> Self {
276        Self {
277            column_desc,
278            is_hidden: true,
279        }
280    }
281
282    /// Get the column catalog's is hidden.
283    pub fn is_hidden(&self) -> bool {
284        self.is_hidden
285    }
286
287    /// If the column is a generated column
288    pub fn is_generated(&self) -> bool {
289        self.column_desc.is_generated()
290    }
291
292    pub fn can_dml(&self) -> bool {
293        !self.is_generated() && !self.is_rw_timestamp_column()
294    }
295
296    /// Returns whether the column is defined by user within the column definition clause
297    /// in the `CREATE TABLE` statement.
298    pub fn is_user_defined(&self) -> bool {
299        !self.is_hidden() && !self.is_rw_sys_column() && !self.is_connector_additional_column()
300    }
301
302    /// If the column is a generated column
303    pub fn generated_expr(&self) -> Option<&ExprNode> {
304        if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
305            &self.column_desc.generated_or_default_column
306        {
307            Some(desc.expr.as_ref().unwrap())
308        } else {
309            None
310        }
311    }
312
313    /// If the columns is an `INCLUDE ... AS ...` connector column.
314    pub fn is_connector_additional_column(&self) -> bool {
315        self.column_desc.additional_column.column_type.is_some()
316    }
317
318    /// Get a reference to the column desc's data type.
319    pub fn data_type(&self) -> &DataType {
320        &self.column_desc.data_type
321    }
322
323    /// Get nullable info of the column.
324    pub fn nullable(&self) -> bool {
325        self.column_desc.nullable
326    }
327
328    /// Get the column desc's column id.
329    pub fn column_id(&self) -> ColumnId {
330        self.column_desc.column_id
331    }
332
333    /// Get a reference to the column desc's name.
334    pub fn name(&self) -> &str {
335        self.column_desc.name.as_ref()
336    }
337
338    /// Convert column catalog to proto
339    pub fn to_protobuf(&self) -> PbColumnCatalog {
340        PbColumnCatalog {
341            column_desc: Some(self.column_desc.to_protobuf()),
342            is_hidden: self.is_hidden,
343        }
344    }
345
346    /// Creates a row ID column (for implicit primary key).
347    /// It'll always have the ID `0`.
348    pub fn row_id_column() -> Self {
349        Self::hidden(ColumnDesc::named(
350            ROW_ID_COLUMN_NAME,
351            ROW_ID_COLUMN_ID,
352            DataType::Serial,
353        ))
354    }
355
356    pub fn is_rw_sys_column(&self) -> bool {
357        self.column_desc.system_column.is_some()
358    }
359
360    pub fn rw_timestamp_column() -> Self {
361        Self::hidden(ColumnDesc::named_with_system_column(
362            RW_TIMESTAMP_COLUMN_NAME,
363            RW_TIMESTAMP_COLUMN_ID,
364            DataType::Timestamptz,
365            SystemColumn::RwTimestamp,
366        ))
367    }
368
369    pub fn is_rw_timestamp_column(&self) -> bool {
370        matches!(
371            self.column_desc.system_column,
372            Some(SystemColumn::RwTimestamp)
373        )
374    }
375
376    // XXX: should we use INCLUDE columns or SYSTEM columns instead of normal hidden columns?
377
378    pub fn iceberg_hidden_cols() -> [Self; 3] {
379        [
380            Self::hidden(ColumnDesc::named(
381                ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
382                ColumnId::placeholder(),
383                DataType::Int64,
384            )),
385            Self::hidden(ColumnDesc::named(
386                ICEBERG_FILE_PATH_COLUMN_NAME,
387                ColumnId::placeholder(),
388                DataType::Varchar,
389            )),
390            Self::hidden(ColumnDesc::named(
391                ICEBERG_FILE_POS_COLUMN_NAME,
392                ColumnId::placeholder(),
393                DataType::Int64,
394            )),
395        ]
396    }
397
398    /// Note: these columns are added in `SourceStreamChunkRowWriter::do_action`.
399    /// May also look for the usage of `SourceColumnType`.
400    pub fn debezium_cdc_source_cols() -> [Self; 3] {
401        [
402            Self::visible(ColumnDesc::named(
403                "payload",
404                ColumnId::placeholder(),
405                DataType::Jsonb,
406            )),
407            // upstream offset
408            Self::hidden(ColumnDesc::named(
409                CDC_OFFSET_COLUMN_NAME,
410                ColumnId::placeholder(),
411                DataType::Varchar,
412            )),
413            // upstream table name of the cdc table
414            Self::hidden(ColumnDesc::named(
415                CDC_TABLE_NAME_COLUMN_NAME,
416                ColumnId::placeholder(),
417                DataType::Varchar,
418            )),
419        ]
420    }
421
422    pub fn is_row_id_column(&self) -> bool {
423        self.column_desc.column_id == ROW_ID_COLUMN_ID
424    }
425}
426
427impl From<PbColumnCatalog> for ColumnCatalog {
428    fn from(prost: PbColumnCatalog) -> Self {
429        Self {
430            column_desc: prost.column_desc.unwrap().into(),
431            is_hidden: prost.is_hidden,
432        }
433    }
434}
435
436impl ColumnCatalog {
437    pub fn name_with_hidden(&self) -> Cow<'_, str> {
438        if self.is_hidden {
439            Cow::Owned(format!("{}(hidden)", self.column_desc.name))
440        } else {
441            Cow::Borrowed(&self.column_desc.name)
442        }
443    }
444}
445
446impl FieldLike for ColumnDesc {
447    fn data_type(&self) -> &DataType {
448        &self.data_type
449    }
450
451    fn name(&self) -> &str {
452        &self.name
453    }
454}
455
456impl FieldLike for ColumnCatalog {
457    fn data_type(&self) -> &DataType {
458        &self.column_desc.data_type
459    }
460
461    fn name(&self) -> &str {
462        &self.column_desc.name
463    }
464}
465
466pub fn columns_extend(preserved_columns: &mut Vec<ColumnCatalog>, columns: Vec<ColumnCatalog>) {
467    debug_assert_eq!(ROW_ID_COLUMN_ID.get_id(), 0);
468    let mut max_incoming_column_id = ROW_ID_COLUMN_ID.get_id();
469    columns.iter().for_each(|column| {
470        let column_id = column.column_id().get_id();
471        if column_id > max_incoming_column_id {
472            max_incoming_column_id = column_id;
473        }
474    });
475    preserved_columns.iter_mut().for_each(|column| {
476        column
477            .column_desc
478            .column_id
479            .apply_delta_if_not_row_id(max_incoming_column_id)
480    });
481
482    preserved_columns.extend(columns);
483}
484
485pub fn debug_assert_column_ids_distinct(columns: &[ColumnCatalog]) {
486    debug_assert!(
487        columns
488            .iter()
489            .map(|c| c.column_id())
490            .duplicates()
491            .next()
492            .is_none(),
493        "duplicate ColumnId found in source catalog. Columns: {columns:#?}"
494    );
495}
496
497/// FIXME: Perhaps we should use sth like `ColumnIdGenerator::new_alter`,
498/// However, the `SourceVersion` is problematic: It doesn't contain `next_col_id`.
499/// (But for now this isn't a large problem, since drop column is not allowed for source yet..)
500///
501/// Besides, the logic of column id handling is a mess.
502/// In some places, we use `ColumnId::placeholder()`, and use `col_id_gen` to fill it at the end;
503/// In other places, we create column id ad-hoc.
504pub fn max_column_id(columns: &[ColumnCatalog]) -> ColumnId {
505    // XXX: should we check the column IDs of struct fields here?
506    columns
507        .iter()
508        .fold(ColumnId::first_user_column(), |a, b| a.max(b.column_id()))
509}
510
511#[cfg(test)]
512pub mod tests {
513    use risingwave_pb::plan_common::PbColumnDesc;
514
515    use crate::catalog::ColumnDesc;
516    use crate::test_prelude::*;
517    use crate::types::{DataType, StructType};
518
519    pub fn build_prost_desc() -> PbColumnDesc {
520        let city = DataType::from(StructType::new([
521            ("country.city.address", DataType::Varchar),
522            ("country.city.zipcode", DataType::Varchar),
523        ]));
524        let country = DataType::from(StructType::new([
525            ("country.address", DataType::Varchar),
526            ("country.city", city),
527        ]));
528        PbColumnDesc::new(country.to_protobuf(), "country", 5)
529    }
530
531    pub fn build_desc() -> ColumnDesc {
532        let city = StructType::new([
533            ("country.city.address", DataType::Varchar),
534            ("country.city.zipcode", DataType::Varchar),
535        ]);
536        let country = StructType::new([
537            ("country.address", DataType::Varchar),
538            ("country.city", city.into()),
539        ]);
540        ColumnDesc::named("country", 5.into(), country.into())
541    }
542
543    #[test]
544    fn test_into_column_catalog() {
545        let desc: ColumnDesc = build_prost_desc().into();
546        assert_eq!(desc, build_desc());
547    }
548}