risingwave_frontend/catalog/
table_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21    ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, Schema,
22    StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_connector::source::cdc::external::ExternalCdcTableType;
28use risingwave_pb::catalog::table::{
29    CdcTableType as PbCdcTableType, OptionalAssociatedSourceId, PbEngine, PbTableType,
30    PbTableVersion,
31};
32use risingwave_pb::catalog::{
33    PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
34};
35use risingwave_pb::common::PbColumnOrder;
36use risingwave_pb::plan_common::DefaultColumnDesc;
37use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
38use risingwave_sqlparser::ast;
39use risingwave_sqlparser::parser::Parser;
40use thiserror_ext::AsReport as _;
41
42use super::purify::try_purify_table_source_create_sql_ast;
43use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId};
44use crate::error::{ErrorCode, Result, RwError};
45use crate::expr::ExprImpl;
46use crate::optimizer::property::Cardinality;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50/// `TableCatalog` Includes full information about a table.
51///
52/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
53/// It is not the same as a user-side table created by `CREATE TABLE`.
54///
55/// Use [`Self::table_type()`] to determine the [`TableType`] of the table.
56///
57/// # Column ID & Column Index
58///
59/// [`ColumnId`](risingwave_common::catalog::ColumnId) (with type `i32`) is the unique identifier of
60/// a column in a table. It is used to access storage.
61///
62/// Column index, or idx, (with type `usize`) is the relative position inside the `Vec` of columns.
63///
64/// A tip to avoid making mistakes is never do casting - i32 as usize or vice versa.
65///
66/// # Keys
67///
68/// All the keys are represented as column indices.
69///
70/// - **Primary Key** (pk): unique identifier of a row.
71///
72/// - **Order Key**: the primary key for storage, used to sort and access data.
73///
74///   For an MV, the columns in `ORDER BY` clause will be put at the beginning of the order key. And
75///   the remaining columns in pk will follow behind.
76///
77///   If there's no `ORDER BY` clause, the order key will be the same as pk.
78///
79/// - **Distribution Key**: the columns used to partition the data. It must be a subset of the order
80///   key.
81#[derive(Clone, Debug, PartialEq, Eq, Hash)]
82#[cfg_attr(test, derive(Default))]
83pub struct TableCatalog {
84    pub id: TableId,
85
86    pub schema_id: SchemaId,
87
88    pub database_id: DatabaseId,
89
90    pub associated_source_id: Option<TableId>, // TODO: use SourceId
91
92    pub name: String,
93
94    /// All columns in this table.
95    pub columns: Vec<ColumnCatalog>,
96
97    /// Key used as materialize's storage key prefix, including MV order columns and `stream_key`.
98    pub pk: Vec<ColumnOrder>,
99
100    /// `pk_indices` of the corresponding materialize operator's output.
101    pub stream_key: Vec<usize>,
102
103    /// Type of the table. Used to distinguish user-created tables, materialized views, index
104    /// tables, and internal tables.
105    pub table_type: TableType,
106
107    /// Distribution key column indices.
108    pub distribution_key: Vec<usize>,
109
110    /// The append-only attribute is derived from `StreamMaterialize` and `StreamTableScan` relies
111    /// on this to derive an append-only stream plan.
112    pub append_only: bool,
113
114    /// The cardinality of the table.
115    pub cardinality: Cardinality,
116
117    /// Owner of the table.
118    pub owner: UserId,
119
120    // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
121    pub retention_seconds: Option<u32>,
122
123    /// The fragment id of the `Materialize` operator for this table.
124    pub fragment_id: FragmentId,
125
126    /// The fragment id of the `DML` operator for this table.
127    pub dml_fragment_id: Option<FragmentId>,
128
129    /// An optional column index which is the vnode of each row computed by the table's consistent
130    /// hash distribution.
131    pub vnode_col_index: Option<usize>,
132
133    /// An optional column index of row id. If the primary key is specified by users, this will be
134    /// `None`.
135    pub row_id_index: Option<usize>,
136
137    /// The column indices which are stored in the state store's value with row-encoding.
138    pub value_indices: Vec<usize>,
139
140    /// The full `CREATE TABLE` or `CREATE MATERIALIZED VIEW` definition of the table.
141    pub definition: String,
142
143    /// The behavior of handling incoming pk conflict from source executor, we can overwrite or
144    /// ignore conflict pk. For normal materialize executor and other executors, this field will be
145    /// `No Check`.
146    pub conflict_behavior: ConflictBehavior,
147
148    pub version_column_indices: Vec<usize>,
149
150    pub read_prefix_len_hint: usize,
151
152    /// Per-table catalog version, used by schema change. `None` for internal tables and tests.
153    pub version: Option<TableVersion>,
154
155    /// The column indices which could receive watermarks.
156    pub watermark_columns: FixedBitSet,
157
158    /// Optional field specifies the distribution key indices in pk.
159    /// See <https://github.com/risingwavelabs/risingwave/issues/8377> for more information.
160    pub dist_key_in_pk: Vec<usize>,
161
162    pub created_at_epoch: Option<Epoch>,
163
164    pub initialized_at_epoch: Option<Epoch>,
165
166    /// Indicate whether to use watermark cache for state table.
167    pub cleaned_by_watermark: bool,
168
169    /// Indicate whether to create table in background or foreground.
170    pub create_type: CreateType,
171
172    /// Indicate the stream job status, whether it is created or creating.
173    /// If it is creating, we should hide it.
174    pub stream_job_status: StreamJobStatus,
175
176    /// description of table, set by `comment on`.
177    pub description: Option<String>,
178
179    pub created_at_cluster_version: Option<String>,
180
181    pub initialized_at_cluster_version: Option<String>,
182
183    pub cdc_table_id: Option<String>,
184
185    /// Total vnode count of the table.
186    ///
187    /// Can be [`VnodeCount::Placeholder`] if the catalog is generated by the frontend and the
188    /// corresponding job is still in `Creating` status, in which case calling [`Self::vnode_count`]
189    /// will panic.
190    ///
191    /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog
192    /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build
193    pub vnode_count: VnodeCount,
194
195    pub webhook_info: Option<PbWebhookSourceInfo>,
196
197    pub job_id: Option<TableId>,
198
199    pub engine: Engine,
200
201    pub clean_watermark_index_in_pk: Option<usize>,
202
203    /// Whether the table supports manual refresh operations
204    pub refreshable: bool,
205
206    pub vector_index_info: Option<PbVectorIndexInfo>,
207
208    pub cdc_table_type: Option<ExternalCdcTableType>,
209}
210
211pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
212pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
213
214#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
215pub enum TableType {
216    /// Tables created by `CREATE TABLE`.
217    Table,
218    /// Tables created by `CREATE MATERIALIZED VIEW`.
219    MaterializedView,
220    /// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`.
221    /// An index has both a `TableCatalog` and an `IndexCatalog`.
222    Index,
223    VectorIndex,
224    /// Internal tables for executors.
225    Internal,
226}
227
228#[cfg(test)]
229impl Default for TableType {
230    fn default() -> Self {
231        Self::Table
232    }
233}
234
235impl TableType {
236    fn from_prost(prost: PbTableType) -> Self {
237        match prost {
238            PbTableType::Table => Self::Table,
239            PbTableType::MaterializedView => Self::MaterializedView,
240            PbTableType::Index => Self::Index,
241            PbTableType::Internal => Self::Internal,
242            PbTableType::VectorIndex => Self::VectorIndex,
243            PbTableType::Unspecified => unreachable!(),
244        }
245    }
246
247    pub(crate) fn to_prost(self) -> PbTableType {
248        match self {
249            Self::Table => PbTableType::Table,
250            Self::MaterializedView => PbTableType::MaterializedView,
251            Self::Index => PbTableType::Index,
252            Self::VectorIndex => PbTableType::VectorIndex,
253            Self::Internal => PbTableType::Internal,
254        }
255    }
256}
257
258/// The version of a table, used by schema change. See [`PbTableVersion`] for more details.
259#[derive(Clone, Debug, PartialEq, Eq, Hash)]
260pub struct TableVersion {
261    pub version_id: TableVersionId,
262    pub next_column_id: ColumnId,
263}
264
265impl TableVersion {
266    /// Create an initial version for a table, with the given max column id.
267    #[cfg(test)]
268    pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
269        use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
270
271        Self {
272            version_id: INITIAL_TABLE_VERSION_ID,
273            next_column_id: max_column_id.next(),
274        }
275    }
276
277    pub fn from_prost(prost: PbTableVersion) -> Self {
278        Self {
279            version_id: prost.version,
280            next_column_id: ColumnId::from(prost.next_column_id),
281        }
282    }
283
284    pub fn to_prost(&self) -> PbTableVersion {
285        PbTableVersion {
286            version: self.version_id,
287            next_column_id: self.next_column_id.into(),
288        }
289    }
290}
291
292impl TableCatalog {
293    /// Returns the SQL definition when the table was created, purified with best effort
294    /// if it's a table.
295    ///
296    /// See [`Self::create_sql_ast_purified`] for more details.
297    pub fn create_sql_purified(&self) -> String {
298        self.create_sql_ast_purified()
299            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
300            .unwrap_or_else(|_| self.create_sql())
301    }
302
303    /// Returns the parsed SQL definition when the table was created, purified with best effort
304    /// if it's a table.
305    ///
306    /// Returns error if it's invalid.
307    pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
308        // Purification is only applicable to tables.
309        if let TableType::Table = self.table_type() {
310            let base = if self.definition.is_empty() {
311                // Created by `CREATE TABLE AS`, create a skeleton `CREATE TABLE` statement.
312                let name = ast::ObjectName(vec![self.name.as_str().into()]);
313                ast::Statement::default_create_table(name)
314            } else {
315                self.create_sql_ast_from_persisted()?
316            };
317
318            match try_purify_table_source_create_sql_ast(
319                base,
320                self.columns(),
321                self.row_id_index,
322                &self.pk_column_ids(),
323            ) {
324                Ok(stmt) => return Ok(stmt),
325                Err(e) => notice_to_user(format!(
326                    "error occurred while purifying definition for table \"{}\", \
327                     results may be inaccurate: {}",
328                    self.name,
329                    e.as_report()
330                )),
331            }
332        }
333
334        self.create_sql_ast_from_persisted()
335    }
336}
337
338impl TableCatalog {
339    /// Get a reference to the table catalog's table id.
340    pub fn id(&self) -> TableId {
341        self.id
342    }
343
344    pub fn with_id(mut self, id: TableId) -> Self {
345        self.id = id;
346        self
347    }
348
349    pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
350        self.cleaned_by_watermark = cleaned_by_watermark;
351        self
352    }
353
354    pub fn conflict_behavior(&self) -> ConflictBehavior {
355        self.conflict_behavior
356    }
357
358    pub fn table_type(&self) -> TableType {
359        self.table_type
360    }
361
362    pub fn engine(&self) -> Engine {
363        self.engine
364    }
365
366    pub fn iceberg_source_name(&self) -> Option<String> {
367        match self.engine {
368            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
369            Engine::Hummock => None,
370        }
371    }
372
373    pub fn iceberg_sink_name(&self) -> Option<String> {
374        match self.engine {
375            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
376            Engine::Hummock => None,
377        }
378    }
379
380    pub fn is_user_table(&self) -> bool {
381        self.table_type == TableType::Table
382    }
383
384    pub fn is_internal_table(&self) -> bool {
385        self.table_type == TableType::Internal
386    }
387
388    pub fn is_mview(&self) -> bool {
389        self.table_type == TableType::MaterializedView
390    }
391
392    pub fn is_index(&self) -> bool {
393        self.table_type == TableType::Index
394    }
395
396    /// Returns an error if `DROP` statements are used on the wrong type of table.
397    #[must_use]
398    pub fn bad_drop_error(&self) -> RwError {
399        let msg = match self.table_type {
400            TableType::MaterializedView => {
401                "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
402            }
403            TableType::Index | TableType::VectorIndex => "Use `DROP INDEX` to drop an index.",
404            TableType::Table => "Use `DROP TABLE` to drop a table.",
405            TableType::Internal => "Internal tables cannot be dropped.",
406        };
407
408        ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
409    }
410
411    /// Get the table catalog's associated source id.
412    #[must_use]
413    pub fn associated_source_id(&self) -> Option<TableId> {
414        self.associated_source_id
415    }
416
417    pub fn has_associated_source(&self) -> bool {
418        self.associated_source_id.is_some()
419    }
420
421    /// Get a reference to the table catalog's columns.
422    pub fn columns(&self) -> &[ColumnCatalog] {
423        &self.columns
424    }
425
426    pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
427        self.columns
428            .iter()
429            .filter(|c| !c.is_rw_timestamp_column())
430            .cloned()
431            .collect()
432    }
433
434    /// Get a reference to the table catalog's pk desc.
435    pub fn pk(&self) -> &[ColumnOrder] {
436        self.pk.as_ref()
437    }
438
439    /// Get the column IDs of the primary key.
440    pub fn pk_column_ids(&self) -> Vec<ColumnId> {
441        self.pk
442            .iter()
443            .map(|x| self.columns[x.column_index].column_id())
444            .collect()
445    }
446
447    /// Get the column names of the primary key.
448    pub fn pk_column_names(&self) -> Vec<&str> {
449        self.pk
450            .iter()
451            .map(|x| self.columns[x.column_index].name())
452            .collect()
453    }
454
455    /// Get a [`TableDesc`] of the table.
456    ///
457    /// Note: this must be called on existing tables, otherwise it will fail to get the vnode count
458    /// (which is determined by the meta service) and panic.
459    pub fn table_desc(&self) -> TableDesc {
460        use risingwave_common::catalog::TableOption;
461
462        let table_options = TableOption::new(self.retention_seconds);
463
464        TableDesc {
465            table_id: self.id,
466            pk: self.pk.clone(),
467            stream_key: self.stream_key.clone(),
468            columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
469            distribution_key: self.distribution_key.clone(),
470            append_only: self.append_only,
471            retention_seconds: table_options.retention_seconds,
472            value_indices: self.value_indices.clone(),
473            read_prefix_len_hint: self.read_prefix_len_hint,
474            watermark_columns: self.watermark_columns.clone(),
475            versioned: self.version.is_some(),
476            vnode_col_index: self.vnode_col_index,
477            vnode_count: self.vnode_count(),
478        }
479    }
480
481    /// Get a reference to the table catalog's name.
482    pub fn name(&self) -> &str {
483        self.name.as_ref()
484    }
485
486    pub fn distribution_key(&self) -> &[usize] {
487        self.distribution_key.as_ref()
488    }
489
490    pub fn to_internal_table_prost(&self) -> PbTable {
491        self.to_prost()
492    }
493
494    /// Returns the SQL definition when the table was created.
495    ///
496    /// See [`Self::create_sql_ast`] for more details.
497    pub fn create_sql(&self) -> String {
498        self.create_sql_ast()
499            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
500            .unwrap_or_else(|_| self.definition.clone())
501    }
502
503    /// Returns the parsed SQL definition when the table was created.
504    ///
505    /// Re-create the table with this statement may have different schema if the schema is derived
506    /// from external systems (like schema registry) or it's created by `CREATE TABLE AS`. If this
507    /// is not desired, use [`Self::create_sql_ast_purified`] instead.
508    ///
509    /// Returns error if it's invalid.
510    pub fn create_sql_ast(&self) -> Result<ast::Statement> {
511        if let TableType::Table = self.table_type()
512            && self.definition.is_empty()
513        {
514            // Always fix definition for `CREATE TABLE AS`.
515            self.create_sql_ast_purified()
516        } else {
517            // Directly parse the persisted definition.
518            self.create_sql_ast_from_persisted()
519        }
520    }
521
522    fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
523        Ok(Parser::parse_exactly_one(&self.definition)?)
524    }
525
526    /// Get a reference to the table catalog's version.
527    pub fn version(&self) -> Option<&TableVersion> {
528        self.version.as_ref()
529    }
530
531    /// Get the table's version id. Returns `None` if the table has no version field.
532    pub fn version_id(&self) -> Option<TableVersionId> {
533        self.version().map(|v| v.version_id)
534    }
535
536    /// Get the total vnode count of the table.
537    ///
538    /// Panics if it's called on an incomplete (and not yet persisted) table catalog.
539    pub fn vnode_count(&self) -> usize {
540        self.vnode_count.value()
541    }
542
543    pub fn to_prost(&self) -> PbTable {
544        PbTable {
545            id: self.id.table_id,
546            schema_id: self.schema_id,
547            database_id: self.database_id,
548            name: self.name.clone(),
549            // ignore `_rw_timestamp` when serializing
550            columns: self
551                .columns_without_rw_timestamp()
552                .iter()
553                .map(|c| c.to_protobuf())
554                .collect(),
555            pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
556            stream_key: self.stream_key.iter().map(|x| *x as _).collect(),
557            optional_associated_source_id: self
558                .associated_source_id
559                .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
560            table_type: self.table_type.to_prost() as i32,
561            distribution_key: self
562                .distribution_key
563                .iter()
564                .map(|k| *k as i32)
565                .collect_vec(),
566            append_only: self.append_only,
567            owner: self.owner,
568            fragment_id: self.fragment_id,
569            dml_fragment_id: self.dml_fragment_id,
570            vnode_col_index: self.vnode_col_index.map(|i| i as _),
571            row_id_index: self.row_id_index.map(|i| i as _),
572            value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
573            definition: self.definition.clone(),
574            read_prefix_len_hint: self.read_prefix_len_hint as u32,
575            version: self.version.as_ref().map(TableVersion::to_prost),
576            watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
577            dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
578            handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
579            version_column_indices: self
580                .version_column_indices
581                .iter()
582                .map(|&idx| idx as u32)
583                .collect(),
584            cardinality: Some(self.cardinality.to_protobuf()),
585            initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
586            created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
587            cleaned_by_watermark: self.cleaned_by_watermark,
588            stream_job_status: self.stream_job_status.to_proto().into(),
589            create_type: self.create_type.to_proto().into(),
590            description: self.description.clone(),
591            #[expect(deprecated)]
592            incoming_sinks: vec![],
593            created_at_cluster_version: self.created_at_cluster_version.clone(),
594            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
595            retention_seconds: self.retention_seconds,
596            cdc_table_id: self.cdc_table_id.clone(),
597            maybe_vnode_count: self.vnode_count.to_protobuf(),
598            webhook_info: self.webhook_info.clone(),
599            job_id: self.job_id.map(|id| id.table_id),
600            engine: Some(self.engine.to_protobuf().into()),
601            clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
602            refreshable: self.refreshable,
603            vector_index_info: self.vector_index_info,
604            cdc_table_type: self
605                .cdc_table_type
606                .clone()
607                .map(|t| PbCdcTableType::from(t) as i32),
608        }
609    }
610
611    /// Get columns excluding hidden columns and generated golumns.
612    pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
613        self.columns
614            .iter()
615            .filter(|c| !c.is_hidden() && !c.is_generated())
616    }
617
618    pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
619        self.columns
620            .iter()
621            .filter(|c| c.is_generated())
622            .map(|c| c.name())
623    }
624
625    pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
626        self.columns
627            .iter()
628            .enumerate()
629            .filter(|(_, c)| c.is_generated())
630            .map(|(i, _)| i)
631    }
632
633    pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
634        if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
635            .columns[col_idx]
636            .column_desc
637            .generated_or_default_column
638            .as_ref()
639        {
640            ExprImpl::from_expr_proto(expr.as_ref().unwrap())
641                .expect("expr in default columns corrupted")
642        } else {
643            ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
644        }
645    }
646
647    pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
648        columns
649            .iter()
650            .map(|c| {
651                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
652                    expr,
653                    ..
654                })) = c.column_desc.generated_or_default_column.as_ref()
655                {
656                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
657                        .expect("expr in default columns corrupted")
658                } else {
659                    ExprImpl::literal_null(c.data_type().clone())
660                }
661            })
662            .collect()
663    }
664
665    pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
666        self.columns.iter().enumerate().filter_map(|(i, c)| {
667            if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
668                expr, ..
669            })) = c.column_desc.generated_or_default_column.as_ref()
670            {
671                Some((
672                    i,
673                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
674                        .expect("expr in default columns corrupted"),
675                ))
676            } else {
677                None
678            }
679        })
680    }
681
682    pub fn has_generated_column(&self) -> bool {
683        self.columns.iter().any(|c| c.is_generated())
684    }
685
686    pub fn has_rw_timestamp_column(&self) -> bool {
687        self.columns.iter().any(|c| c.is_rw_timestamp_column())
688    }
689
690    pub fn column_schema(&self) -> Schema {
691        Schema::new(
692            self.columns
693                .iter()
694                .map(|c| Field::from(&c.column_desc))
695                .collect(),
696        )
697    }
698
699    pub fn is_created(&self) -> bool {
700        self.stream_job_status == StreamJobStatus::Created
701    }
702
703    pub fn is_iceberg_engine_table(&self) -> bool {
704        self.engine == Engine::Iceberg
705    }
706
707    pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
708        self.pk.iter().map(|col| col.column_index)
709    }
710
711    pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
712        ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
713    }
714
715    pub fn order_column_ids(&self) -> Vec<ColumnId> {
716        self.pk
717            .iter()
718            .map(|col| self.columns[col.column_index].column_desc.column_id)
719            .collect()
720    }
721
722    pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
723        // Set materialize key as arrange key + pk
724        self.pk.iter().map(|x| x.to_protobuf()).collect()
725    }
726}
727
728impl From<PbTable> for TableCatalog {
729    fn from(tb: PbTable) -> Self {
730        let id = tb.id;
731        let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
732        let tb_engine = tb
733            .get_engine()
734            .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
735            .unwrap_or(PbEngine::Hummock);
736        let table_type = tb.get_table_type().unwrap();
737        let stream_job_status = tb
738            .get_stream_job_status()
739            .unwrap_or(PbStreamJobStatus::Created);
740        let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
741        let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
742            OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
743        });
744        let name = tb.name.clone();
745
746        let vnode_count = tb.vnode_count_inner();
747        if let VnodeCount::Placeholder = vnode_count {
748            // Only allow placeholder vnode count for creating tables.
749            // After the table is created, an `Update` notification will be used to update the vnode count field.
750            assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
751        }
752
753        let mut col_names = HashSet::new();
754        let mut col_index: HashMap<i32, usize> = HashMap::new();
755
756        let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
757        let version_column_indices: Vec<usize> = tb
758            .version_column_indices
759            .iter()
760            .map(|&idx| idx as usize)
761            .collect();
762        let mut columns: Vec<ColumnCatalog> =
763            tb.columns.into_iter().map(ColumnCatalog::from).collect();
764        if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
765            // Add system column `_rw_timestamp` to every table, but notice that this column is never persisted.
766            columns.push(ColumnCatalog::rw_timestamp_column());
767        }
768        for (idx, catalog) in columns.clone().into_iter().enumerate() {
769            let col_name = catalog.name();
770            if !col_names.insert(col_name.to_owned()) {
771                panic!("duplicated column name {} in table {} ", col_name, tb.name)
772            }
773
774            let col_id = catalog.column_desc.column_id.get_id();
775            col_index.insert(col_id, idx);
776        }
777
778        let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
779        let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
780        for idx in &tb.watermark_indices {
781            watermark_columns.insert(*idx as _);
782        }
783        let engine = Engine::from_protobuf(&tb_engine);
784
785        Self {
786            id: id.into(),
787            schema_id: tb.schema_id,
788            database_id: tb.database_id,
789            associated_source_id: associated_source_id.map(Into::into),
790            name,
791            pk,
792            columns,
793            table_type: TableType::from_prost(table_type),
794            distribution_key: tb
795                .distribution_key
796                .iter()
797                .map(|k| *k as usize)
798                .collect_vec(),
799            stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
800            append_only: tb.append_only,
801            owner: tb.owner,
802            fragment_id: tb.fragment_id,
803            dml_fragment_id: tb.dml_fragment_id,
804            vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
805            row_id_index: tb.row_id_index.map(|x| x as usize),
806            value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
807            definition: tb.definition,
808            conflict_behavior,
809            version_column_indices,
810            read_prefix_len_hint: tb.read_prefix_len_hint as usize,
811            version: tb.version.map(TableVersion::from_prost),
812            watermark_columns,
813            dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
814            cardinality: tb
815                .cardinality
816                .map(|c| Cardinality::from_protobuf(&c))
817                .unwrap_or_else(Cardinality::unknown),
818            created_at_epoch: tb.created_at_epoch.map(Epoch::from),
819            initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
820            cleaned_by_watermark: tb.cleaned_by_watermark,
821            create_type: CreateType::from_proto(create_type),
822            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
823            description: tb.description,
824            created_at_cluster_version: tb.created_at_cluster_version.clone(),
825            initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
826            retention_seconds: tb.retention_seconds,
827            cdc_table_id: tb.cdc_table_id,
828            vnode_count,
829            webhook_info: tb.webhook_info,
830            job_id: tb.job_id.map(TableId::from),
831            engine,
832            clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
833
834            refreshable: tb.refreshable,
835            vector_index_info: tb.vector_index_info,
836            cdc_table_type: tb
837                .cdc_table_type
838                .and_then(|t| PbCdcTableType::try_from(t).ok())
839                .map(ExternalCdcTableType::from),
840        }
841    }
842}
843
844impl From<&PbTable> for TableCatalog {
845    fn from(tb: &PbTable) -> Self {
846        tb.clone().into()
847    }
848}
849
850impl OwnedByUserCatalog for TableCatalog {
851    fn owner(&self) -> UserId {
852        self.owner
853    }
854}
855
856#[cfg(test)]
857mod tests {
858    use risingwave_common::catalog::{ColumnDesc, ColumnId};
859    use risingwave_common::test_prelude::*;
860    use risingwave_common::types::*;
861    use risingwave_common::util::sort_util::OrderType;
862    use risingwave_pb::catalog::table::PbEngine;
863    use risingwave_pb::plan_common::{
864        AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
865    };
866
867    use super::*;
868
869    #[test]
870    fn test_into_table_catalog() {
871        let table: TableCatalog = PbTable {
872            id: 0,
873            schema_id: 0,
874            database_id: 0,
875            name: "test".to_owned(),
876            table_type: PbTableType::Table as i32,
877            columns: vec![
878                ColumnCatalog::row_id_column().to_protobuf(),
879                PbColumnCatalog {
880                    column_desc: Some(PbColumnDesc::new(
881                        DataType::from(StructType::new([
882                            ("address", DataType::Varchar),
883                            ("zipcode", DataType::Varchar),
884                        ]))
885                        .to_protobuf(),
886                        "country",
887                        1,
888                    )),
889                    is_hidden: false,
890                },
891            ],
892            pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
893            stream_key: vec![0],
894            distribution_key: vec![0],
895            optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233)
896                .into(),
897            append_only: false,
898            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
899            retention_seconds: Some(300),
900            fragment_id: 0,
901            dml_fragment_id: None,
902            initialized_at_epoch: None,
903            value_indices: vec![0],
904            definition: "".into(),
905            read_prefix_len_hint: 0,
906            vnode_col_index: None,
907            row_id_index: None,
908            version: Some(PbTableVersion {
909                version: 0,
910                next_column_id: 2,
911            }),
912            watermark_indices: vec![],
913            handle_pk_conflict_behavior: 3,
914            dist_key_in_pk: vec![0],
915            cardinality: None,
916            created_at_epoch: None,
917            cleaned_by_watermark: false,
918            stream_job_status: PbStreamJobStatus::Created.into(),
919            create_type: PbCreateType::Foreground.into(),
920            description: Some("description".to_owned()),
921            #[expect(deprecated)]
922            incoming_sinks: vec![],
923            created_at_cluster_version: None,
924            initialized_at_cluster_version: None,
925            version_column_indices: Vec::new(),
926            cdc_table_id: None,
927            maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
928            webhook_info: None,
929            job_id: None,
930            engine: Some(PbEngine::Hummock as i32),
931            clean_watermark_index_in_pk: None,
932
933            refreshable: false,
934            vector_index_info: None,
935            cdc_table_type: None,
936        }
937        .into();
938
939        assert_eq!(
940            table,
941            TableCatalog {
942                id: TableId::new(0),
943                schema_id: 0,
944                database_id: 0,
945                associated_source_id: Some(TableId::new(233)),
946                name: "test".to_owned(),
947                table_type: TableType::Table,
948                columns: vec![
949                    ColumnCatalog::row_id_column(),
950                    ColumnCatalog {
951                        column_desc: ColumnDesc {
952                            data_type: StructType::new(vec![
953                                ("address", DataType::Varchar),
954                                ("zipcode", DataType::Varchar)
955                            ],)
956                            .into(),
957                            column_id: ColumnId::new(1),
958                            name: "country".to_owned(),
959                            description: None,
960                            generated_or_default_column: None,
961                            additional_column: AdditionalColumn { column_type: None },
962                            version: ColumnDescVersion::LATEST,
963                            system_column: None,
964                            nullable: true,
965                        },
966                        is_hidden: false
967                    },
968                    ColumnCatalog::rw_timestamp_column(),
969                ],
970                stream_key: vec![0],
971                pk: vec![ColumnOrder::new(0, OrderType::ascending())],
972                distribution_key: vec![0],
973                append_only: false,
974                owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
975                retention_seconds: Some(300),
976                fragment_id: 0,
977                dml_fragment_id: None,
978                vnode_col_index: None,
979                row_id_index: None,
980                value_indices: vec![0],
981                definition: "".into(),
982                conflict_behavior: ConflictBehavior::NoCheck,
983                read_prefix_len_hint: 0,
984                version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
985                watermark_columns: FixedBitSet::with_capacity(3),
986                dist_key_in_pk: vec![0],
987                cardinality: Cardinality::unknown(),
988                created_at_epoch: None,
989                initialized_at_epoch: None,
990                cleaned_by_watermark: false,
991                stream_job_status: StreamJobStatus::Created,
992                create_type: CreateType::Foreground,
993                description: Some("description".to_owned()),
994                created_at_cluster_version: None,
995                initialized_at_cluster_version: None,
996                version_column_indices: Vec::new(),
997                cdc_table_id: None,
998                vnode_count: VnodeCount::set(233),
999                webhook_info: None,
1000                job_id: None,
1001                engine: Engine::Hummock,
1002                clean_watermark_index_in_pk: None,
1003
1004                refreshable: false,
1005                vector_index_info: None,
1006                cdc_table_type: None,
1007            }
1008        );
1009        assert_eq!(table, TableCatalog::from(table.to_prost()));
1010    }
1011}