risingwave_frontend/catalog/
table_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21    ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, ICEBERG_SINK_PREFIX,
22    ICEBERG_SOURCE_PREFIX, Schema, StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::id::{JobId, SourceId};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_connector::source::cdc::external::ExternalCdcTableType;
29use risingwave_pb::catalog::table::{
30    CdcTableType as PbCdcTableType, PbEngine, PbTableType, PbTableVersion,
31};
32use risingwave_pb::catalog::{
33    PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
34};
35use risingwave_pb::common::PbColumnOrder;
36use risingwave_pb::plan_common::DefaultColumnDesc;
37use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
38use risingwave_sqlparser::ast;
39use risingwave_sqlparser::parser::Parser;
40use thiserror_ext::AsReport as _;
41
42use super::purify::try_purify_table_source_create_sql_ast;
43use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId};
44use crate::error::{ErrorCode, Result, RwError};
45use crate::expr::ExprImpl;
46use crate::optimizer::property::Cardinality;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50/// `TableCatalog` Includes full information about a table.
51///
52/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
53/// It is not the same as a user-side table created by `CREATE TABLE`.
54///
55/// Use [`Self::table_type()`] to determine the [`TableType`] of the table.
56///
57/// # Column ID & Column Index
58///
59/// [`ColumnId`](risingwave_common::catalog::ColumnId) (with type `i32`) is the unique identifier of
60/// a column in a table. It is used to access storage.
61///
62/// Column index, or idx, (with type `usize`) is the relative position inside the `Vec` of columns.
63///
64/// A tip to avoid making mistakes is never do casting - i32 as usize or vice versa.
65///
66/// # Keys
67///
68/// All the keys are represented as column indices.
69///
70/// - **Primary Key** (pk): unique identifier of a row.
71///
72/// - **Order Key**: the primary key for storage, used to sort and access data.
73///
74///   For an MV, the columns in `ORDER BY` clause will be put at the beginning of the order key. And
75///   the remaining columns in pk will follow behind.
76///
77///   If there's no `ORDER BY` clause, the order key will be the same as pk.
78///
79/// - **Distribution Key**: the columns used to partition the data. It must be a subset of the order
80///   key.
81#[derive(Clone, Debug, PartialEq, Eq, Hash)]
82#[cfg_attr(test, derive(Default))]
83pub struct TableCatalog {
84    pub id: TableId,
85
86    pub schema_id: SchemaId,
87
88    pub database_id: DatabaseId,
89
90    pub associated_source_id: Option<SourceId>, // TODO: use SourceId
91
92    pub name: String,
93
94    /// All columns in this table.
95    pub columns: Vec<ColumnCatalog>,
96
97    /// Key used as materialize's storage key prefix, including MV order columns and `stream_key`.
98    pub pk: Vec<ColumnOrder>,
99
100    /// `pk_indices` of the corresponding materialize operator's output.
101    /// For the backward compatibility, we should use `stream_key()` method to get the stream key.
102    /// never use this field directly.
103    pub stream_key: Vec<usize>,
104
105    /// Type of the table. Used to distinguish user-created tables, materialized views, index
106    /// tables, and internal tables.
107    pub table_type: TableType,
108
109    /// Distribution key column indices.
110    pub distribution_key: Vec<usize>,
111
112    /// The append-only attribute is derived from `StreamMaterialize` and `StreamTableScan` relies
113    /// on this to derive an append-only stream plan.
114    pub append_only: bool,
115
116    /// The cardinality of the table.
117    pub cardinality: Cardinality,
118
119    /// Owner of the table.
120    pub owner: UserId,
121
122    // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
123    pub retention_seconds: Option<u32>,
124
125    /// The fragment id of the `Materialize` operator for this table.
126    pub fragment_id: FragmentId,
127
128    /// The fragment id of the `DML` operator for this table.
129    pub dml_fragment_id: Option<FragmentId>,
130
131    /// An optional column index which is the vnode of each row computed by the table's consistent
132    /// hash distribution.
133    pub vnode_col_index: Option<usize>,
134
135    /// An optional column index of row id. If the primary key is specified by users, this will be
136    /// `None`.
137    pub row_id_index: Option<usize>,
138
139    /// The column indices which are stored in the state store's value with row-encoding.
140    pub value_indices: Vec<usize>,
141
142    /// The full `CREATE TABLE` or `CREATE MATERIALIZED VIEW` definition of the table.
143    pub definition: String,
144
145    /// The behavior of handling incoming pk conflict from source executor, we can overwrite or
146    /// ignore conflict pk. For normal materialize executor and other executors, this field will be
147    /// `No Check`.
148    pub conflict_behavior: ConflictBehavior,
149
150    pub version_column_indices: Vec<usize>,
151
152    pub read_prefix_len_hint: usize,
153
154    /// Per-table catalog version, used by schema change. `None` for internal tables and tests.
155    pub version: Option<TableVersion>,
156
157    /// The column indices which could receive watermarks.
158    pub watermark_columns: FixedBitSet,
159
160    /// Optional field specifies the distribution key indices in pk.
161    /// See <https://github.com/risingwavelabs/risingwave/issues/8377> for more information.
162    pub dist_key_in_pk: Vec<usize>,
163
164    pub created_at_epoch: Option<Epoch>,
165
166    pub initialized_at_epoch: Option<Epoch>,
167
168    /// Indicate whether to use watermark cache for state table.
169    pub cleaned_by_watermark: bool,
170
171    /// Indicate whether to create table in background or foreground.
172    pub create_type: CreateType,
173
174    /// Indicate the stream job status, whether it is created or creating.
175    /// If it is creating, we should hide it.
176    pub stream_job_status: StreamJobStatus,
177
178    /// description of table, set by `comment on`.
179    pub description: Option<String>,
180
181    pub created_at_cluster_version: Option<String>,
182
183    pub initialized_at_cluster_version: Option<String>,
184
185    pub cdc_table_id: Option<String>,
186
187    /// Total vnode count of the table.
188    ///
189    /// Can be [`VnodeCount::Placeholder`] if the catalog is generated by the frontend and the
190    /// corresponding job is still in `Creating` status, in which case calling [`Self::vnode_count`]
191    /// will panic.
192    ///
193    /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog
194    /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build
195    pub vnode_count: VnodeCount,
196
197    pub webhook_info: Option<PbWebhookSourceInfo>,
198
199    pub job_id: Option<JobId>,
200
201    pub engine: Engine,
202
203    pub clean_watermark_index_in_pk: Option<usize>,
204
205    /// Whether the table supports manual refresh operations
206    pub refreshable: bool,
207
208    pub vector_index_info: Option<PbVectorIndexInfo>,
209
210    pub cdc_table_type: Option<ExternalCdcTableType>,
211}
212
213#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
214#[cfg_attr(test, derive(Default))]
215pub enum TableType {
216    /// Tables created by `CREATE TABLE`.
217    #[cfg_attr(test, default)]
218    Table,
219    /// Tables created by `CREATE MATERIALIZED VIEW`.
220    MaterializedView,
221    /// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`.
222    /// An index has both a `TableCatalog` and an `IndexCatalog`.
223    Index,
224    VectorIndex,
225    /// Internal tables for executors.
226    Internal,
227}
228
229impl TableType {
230    fn from_prost(prost: PbTableType) -> Self {
231        match prost {
232            PbTableType::Table => Self::Table,
233            PbTableType::MaterializedView => Self::MaterializedView,
234            PbTableType::Index => Self::Index,
235            PbTableType::Internal => Self::Internal,
236            PbTableType::VectorIndex => Self::VectorIndex,
237            PbTableType::Unspecified => unreachable!(),
238        }
239    }
240
241    pub(crate) fn to_prost(self) -> PbTableType {
242        match self {
243            Self::Table => PbTableType::Table,
244            Self::MaterializedView => PbTableType::MaterializedView,
245            Self::Index => PbTableType::Index,
246            Self::VectorIndex => PbTableType::VectorIndex,
247            Self::Internal => PbTableType::Internal,
248        }
249    }
250}
251
252/// The version of a table, used by schema change. See [`PbTableVersion`] for more details.
253#[derive(Clone, Debug, PartialEq, Eq, Hash)]
254pub struct TableVersion {
255    pub version_id: TableVersionId,
256    pub next_column_id: ColumnId,
257}
258
259impl TableVersion {
260    /// Create an initial version for a table, with the given max column id.
261    #[cfg(test)]
262    pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
263        use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
264
265        Self {
266            version_id: INITIAL_TABLE_VERSION_ID,
267            next_column_id: max_column_id.next(),
268        }
269    }
270
271    pub fn from_prost(prost: PbTableVersion) -> Self {
272        Self {
273            version_id: prost.version,
274            next_column_id: ColumnId::from(prost.next_column_id),
275        }
276    }
277
278    pub fn to_prost(&self) -> PbTableVersion {
279        PbTableVersion {
280            version: self.version_id,
281            next_column_id: self.next_column_id.into(),
282        }
283    }
284}
285
286impl TableCatalog {
287    /// Returns the SQL definition when the table was created, purified with best effort
288    /// if it's a table.
289    ///
290    /// See [`Self::create_sql_ast_purified`] for more details.
291    pub fn create_sql_purified(&self) -> String {
292        self.create_sql_ast_purified()
293            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
294            .unwrap_or_else(|_| self.create_sql())
295    }
296
297    /// Returns the parsed SQL definition when the table was created, purified with best effort
298    /// if it's a table.
299    ///
300    /// Returns error if it's invalid.
301    pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
302        // Purification is only applicable to tables.
303        if let TableType::Table = self.table_type() {
304            let base = if self.definition.is_empty() {
305                // Created by `CREATE TABLE AS`, create a skeleton `CREATE TABLE` statement.
306                let name = ast::ObjectName(vec![self.name.as_str().into()]);
307                ast::Statement::default_create_table(name)
308            } else {
309                self.create_sql_ast_from_persisted()?
310            };
311
312            match try_purify_table_source_create_sql_ast(
313                base,
314                self.columns(),
315                self.row_id_index,
316                &self.pk_column_ids(),
317            ) {
318                Ok(stmt) => return Ok(stmt),
319                Err(e) => notice_to_user(format!(
320                    "error occurred while purifying definition for table \"{}\", \
321                     results may be inaccurate: {}",
322                    self.name,
323                    e.as_report()
324                )),
325            }
326        }
327
328        self.create_sql_ast_from_persisted()
329    }
330}
331
332impl TableCatalog {
333    /// Get a reference to the table catalog's table id.
334    pub fn id(&self) -> TableId {
335        self.id
336    }
337
338    pub fn with_id(mut self, id: TableId) -> Self {
339        self.id = id;
340        self
341    }
342
343    pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
344        self.cleaned_by_watermark = cleaned_by_watermark;
345        self
346    }
347
348    pub fn conflict_behavior(&self) -> ConflictBehavior {
349        self.conflict_behavior
350    }
351
352    pub fn table_type(&self) -> TableType {
353        self.table_type
354    }
355
356    pub fn engine(&self) -> Engine {
357        self.engine
358    }
359
360    pub fn iceberg_source_name(&self) -> Option<String> {
361        match self.engine {
362            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
363            Engine::Hummock => None,
364        }
365    }
366
367    pub fn iceberg_sink_name(&self) -> Option<String> {
368        match self.engine {
369            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
370            Engine::Hummock => None,
371        }
372    }
373
374    pub fn is_user_table(&self) -> bool {
375        self.table_type == TableType::Table
376    }
377
378    pub fn is_internal_table(&self) -> bool {
379        self.table_type == TableType::Internal
380    }
381
382    pub fn is_mview(&self) -> bool {
383        self.table_type == TableType::MaterializedView
384    }
385
386    pub fn is_index(&self) -> bool {
387        self.table_type == TableType::Index
388    }
389
390    /// Returns an error if `DROP` statements are used on the wrong type of table.
391    #[must_use]
392    pub fn bad_drop_error(&self) -> RwError {
393        let msg = match self.table_type {
394            TableType::MaterializedView => {
395                "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
396            }
397            TableType::Index | TableType::VectorIndex => "Use `DROP INDEX` to drop an index.",
398            TableType::Table => "Use `DROP TABLE` to drop a table.",
399            TableType::Internal => "Internal tables cannot be dropped.",
400        };
401
402        ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
403    }
404
405    /// Get the table catalog's associated source id.
406    #[must_use]
407    pub fn associated_source_id(&self) -> Option<SourceId> {
408        self.associated_source_id
409    }
410
411    pub fn has_associated_source(&self) -> bool {
412        self.associated_source_id.is_some()
413    }
414
415    /// Get a reference to the table catalog's columns.
416    pub fn columns(&self) -> &[ColumnCatalog] {
417        &self.columns
418    }
419
420    pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
421        self.columns
422            .iter()
423            .filter(|c| !c.is_rw_timestamp_column())
424            .cloned()
425            .collect()
426    }
427
428    /// Get a reference to the table catalog's pk desc.
429    pub fn pk(&self) -> &[ColumnOrder] {
430        self.pk.as_ref()
431    }
432
433    /// Get the column IDs of the primary key.
434    pub fn pk_column_ids(&self) -> Vec<ColumnId> {
435        self.pk
436            .iter()
437            .map(|x| self.columns[x.column_index].column_id())
438            .collect()
439    }
440
441    /// Get the column names of the primary key.
442    pub fn pk_column_names(&self) -> Vec<&str> {
443        self.pk
444            .iter()
445            .map(|x| self.columns[x.column_index].name())
446            .collect()
447    }
448
449    /// Derive the stream key, which must include all distribution keys.
450    /// For backward compatibility, if distribution key is not in stream key(e.g. indexes created in old versions),
451    /// we need to add them to stream key.
452    pub fn stream_key(&self) -> Vec<usize> {
453        // if distribution key is not in stream key, we need to add them to stream key
454        if self
455            .distribution_key
456            .iter()
457            .any(|dist_key| !self.stream_key.contains(dist_key))
458        {
459            let mut new_stream_key = self.distribution_key.clone();
460            let mut seen: HashSet<usize> = self.distribution_key.iter().copied().collect();
461            for &key in &self.stream_key {
462                if seen.insert(key) {
463                    new_stream_key.push(key);
464                }
465            }
466            new_stream_key
467        } else {
468            self.stream_key.clone()
469        }
470    }
471
472    /// Get a [`TableDesc`] of the table.
473    ///
474    /// Note: this must be called on existing tables, otherwise it will fail to get the vnode count
475    /// (which is determined by the meta service) and panic.
476    pub fn table_desc(&self) -> TableDesc {
477        use risingwave_common::catalog::TableOption;
478
479        let table_options = TableOption::new(self.retention_seconds);
480
481        TableDesc {
482            table_id: self.id,
483            pk: self.pk.clone(),
484            stream_key: self.stream_key(),
485            columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
486            distribution_key: self.distribution_key.clone(),
487            append_only: self.append_only,
488            retention_seconds: table_options.retention_seconds,
489            value_indices: self.value_indices.clone(),
490            read_prefix_len_hint: self.read_prefix_len_hint,
491            watermark_columns: self.watermark_columns.clone(),
492            versioned: self.version.is_some(),
493            vnode_col_index: self.vnode_col_index,
494            vnode_count: self.vnode_count(),
495        }
496    }
497
498    /// Get a reference to the table catalog's name.
499    pub fn name(&self) -> &str {
500        self.name.as_ref()
501    }
502
503    pub fn distribution_key(&self) -> &[usize] {
504        self.distribution_key.as_ref()
505    }
506
507    pub fn to_internal_table_prost(&self) -> PbTable {
508        self.to_prost()
509    }
510
511    /// Returns the SQL definition when the table was created.
512    ///
513    /// See [`Self::create_sql_ast`] for more details.
514    pub fn create_sql(&self) -> String {
515        self.create_sql_ast()
516            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
517            .unwrap_or_else(|_| self.definition.clone())
518    }
519
520    /// Returns the parsed SQL definition when the table was created.
521    ///
522    /// Re-create the table with this statement may have different schema if the schema is derived
523    /// from external systems (like schema registry) or it's created by `CREATE TABLE AS`. If this
524    /// is not desired, use [`Self::create_sql_ast_purified`] instead.
525    ///
526    /// Returns error if it's invalid.
527    pub fn create_sql_ast(&self) -> Result<ast::Statement> {
528        if let TableType::Table = self.table_type()
529            && self.definition.is_empty()
530        {
531            // Always fix definition for `CREATE TABLE AS`.
532            self.create_sql_ast_purified()
533        } else {
534            // Directly parse the persisted definition.
535            self.create_sql_ast_from_persisted()
536        }
537    }
538
539    fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
540        Ok(Parser::parse_exactly_one(&self.definition)?)
541    }
542
543    /// Get a reference to the table catalog's version.
544    pub fn version(&self) -> Option<&TableVersion> {
545        self.version.as_ref()
546    }
547
548    /// Get the table's version id. Returns `None` if the table has no version field.
549    pub fn version_id(&self) -> Option<TableVersionId> {
550        self.version().map(|v| v.version_id)
551    }
552
553    /// Get the total vnode count of the table.
554    pub fn vnode_count(&self) -> usize {
555        if self.id().is_placeholder() {
556            0
557        } else {
558            // Panics if it's called on an incomplete (and not yet persisted) table catalog.
559            self.vnode_count.value()
560        }
561    }
562
563    pub fn to_prost(&self) -> PbTable {
564        PbTable {
565            id: self.id,
566            schema_id: self.schema_id,
567            database_id: self.database_id,
568            name: self.name.clone(),
569            // ignore `_rw_timestamp` when serializing
570            columns: self
571                .columns_without_rw_timestamp()
572                .iter()
573                .map(|c| c.to_protobuf())
574                .collect(),
575            pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
576            stream_key: self.stream_key().iter().map(|x| *x as _).collect(),
577            optional_associated_source_id: self.associated_source_id.map(Into::into),
578            table_type: self.table_type.to_prost() as i32,
579            distribution_key: self
580                .distribution_key
581                .iter()
582                .map(|k| *k as i32)
583                .collect_vec(),
584            append_only: self.append_only,
585            owner: self.owner,
586            fragment_id: self.fragment_id,
587            dml_fragment_id: self.dml_fragment_id,
588            vnode_col_index: self.vnode_col_index.map(|i| i as _),
589            row_id_index: self.row_id_index.map(|i| i as _),
590            value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
591            definition: self.definition.clone(),
592            read_prefix_len_hint: self.read_prefix_len_hint as u32,
593            version: self.version.as_ref().map(TableVersion::to_prost),
594            watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
595            dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
596            handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
597            version_column_indices: self
598                .version_column_indices
599                .iter()
600                .map(|&idx| idx as u32)
601                .collect(),
602            cardinality: Some(self.cardinality.to_protobuf()),
603            initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
604            created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
605            cleaned_by_watermark: self.cleaned_by_watermark,
606            stream_job_status: self.stream_job_status.to_proto().into(),
607            create_type: self.create_type.to_proto().into(),
608            description: self.description.clone(),
609            #[expect(deprecated)]
610            incoming_sinks: vec![],
611            created_at_cluster_version: self.created_at_cluster_version.clone(),
612            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
613            retention_seconds: self.retention_seconds,
614            cdc_table_id: self.cdc_table_id.clone(),
615            maybe_vnode_count: self.vnode_count.to_protobuf(),
616            webhook_info: self.webhook_info.clone(),
617            job_id: self.job_id,
618            engine: Some(self.engine.to_protobuf().into()),
619            clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
620            refreshable: self.refreshable,
621            vector_index_info: self.vector_index_info,
622            cdc_table_type: self
623                .cdc_table_type
624                .clone()
625                .map(|t| PbCdcTableType::from(t) as i32),
626            refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
627        }
628    }
629
630    /// Get columns excluding hidden columns and generated columns.
631    pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
632        self.columns
633            .iter()
634            .filter(|c| !c.is_hidden() && !c.is_generated())
635    }
636
637    pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
638        self.columns
639            .iter()
640            .filter(|c| c.is_generated())
641            .map(|c| c.name())
642    }
643
644    pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
645        self.columns
646            .iter()
647            .enumerate()
648            .filter(|(_, c)| c.is_generated())
649            .map(|(i, _)| i)
650    }
651
652    pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
653        if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
654            .columns[col_idx]
655            .column_desc
656            .generated_or_default_column
657            .as_ref()
658        {
659            ExprImpl::from_expr_proto(expr.as_ref().unwrap())
660                .expect("expr in default columns corrupted")
661        } else {
662            ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
663        }
664    }
665
666    pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
667        columns
668            .iter()
669            .map(|c| {
670                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
671                    expr,
672                    ..
673                })) = c.column_desc.generated_or_default_column.as_ref()
674                {
675                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
676                        .expect("expr in default columns corrupted")
677                } else {
678                    ExprImpl::literal_null(c.data_type().clone())
679                }
680            })
681            .collect()
682    }
683
684    pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
685        self.columns.iter().enumerate().filter_map(|(i, c)| {
686            if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
687                expr, ..
688            })) = c.column_desc.generated_or_default_column.as_ref()
689            {
690                Some((
691                    i,
692                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
693                        .expect("expr in default columns corrupted"),
694                ))
695            } else {
696                None
697            }
698        })
699    }
700
701    pub fn has_generated_column(&self) -> bool {
702        self.columns.iter().any(|c| c.is_generated())
703    }
704
705    pub fn has_rw_timestamp_column(&self) -> bool {
706        self.columns.iter().any(|c| c.is_rw_timestamp_column())
707    }
708
709    pub fn column_schema(&self) -> Schema {
710        Schema::new(
711            self.columns
712                .iter()
713                .map(|c| Field::from(&c.column_desc))
714                .collect(),
715        )
716    }
717
718    pub fn is_created(&self) -> bool {
719        self.stream_job_status == StreamJobStatus::Created
720    }
721
722    pub fn is_iceberg_engine_table(&self) -> bool {
723        self.engine == Engine::Iceberg
724    }
725
726    pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
727        self.pk.iter().map(|col| col.column_index)
728    }
729
730    pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
731        ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
732    }
733
734    pub fn order_column_ids(&self) -> Vec<ColumnId> {
735        self.pk
736            .iter()
737            .map(|col| self.columns[col.column_index].column_desc.column_id)
738            .collect()
739    }
740
741    pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
742        // Set materialize key as arrange key + pk
743        self.pk.iter().map(|x| x.to_protobuf()).collect()
744    }
745}
746
747impl From<PbTable> for TableCatalog {
748    fn from(tb: PbTable) -> Self {
749        let id = tb.id;
750        let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
751        let tb_engine = tb
752            .get_engine()
753            .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
754            .unwrap_or(PbEngine::Hummock);
755        let table_type = tb.get_table_type().unwrap();
756        let stream_job_status = tb
757            .get_stream_job_status()
758            .unwrap_or(PbStreamJobStatus::Created);
759        let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
760        let associated_source_id = tb.optional_associated_source_id.map(Into::into);
761        let name = tb.name.clone();
762
763        let vnode_count = tb.vnode_count_inner();
764        if let VnodeCount::Placeholder = vnode_count {
765            // Only allow placeholder vnode count for creating tables.
766            // After the table is created, an `Update` notification will be used to update the vnode count field.
767            assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
768        }
769
770        let mut col_names = HashSet::new();
771        let mut col_index: HashMap<i32, usize> = HashMap::new();
772
773        let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
774        let version_column_indices: Vec<usize> = tb
775            .version_column_indices
776            .iter()
777            .map(|&idx| idx as usize)
778            .collect();
779        let mut columns: Vec<ColumnCatalog> =
780            tb.columns.into_iter().map(ColumnCatalog::from).collect();
781        if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
782            // Add system column `_rw_timestamp` to every table, but notice that this column is never persisted.
783            columns.push(ColumnCatalog::rw_timestamp_column());
784        }
785        for (idx, catalog) in columns.clone().into_iter().enumerate() {
786            let col_name = catalog.name();
787            if !col_names.insert(col_name.to_owned()) {
788                panic!("duplicated column name {} in table {} ", col_name, tb.name)
789            }
790
791            let col_id = catalog.column_desc.column_id.get_id();
792            col_index.insert(col_id, idx);
793        }
794
795        let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
796        let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
797        for idx in &tb.watermark_indices {
798            watermark_columns.insert(*idx as _);
799        }
800        let engine = Engine::from_protobuf(&tb_engine);
801
802        Self {
803            id,
804            schema_id: tb.schema_id,
805            database_id: tb.database_id,
806            associated_source_id,
807            name,
808            pk,
809            columns,
810            table_type: TableType::from_prost(table_type),
811            distribution_key: tb
812                .distribution_key
813                .iter()
814                .map(|k| *k as usize)
815                .collect_vec(),
816            stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
817            append_only: tb.append_only,
818            owner: tb.owner,
819            fragment_id: tb.fragment_id,
820            dml_fragment_id: tb.dml_fragment_id,
821            vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
822            row_id_index: tb.row_id_index.map(|x| x as usize),
823            value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
824            definition: tb.definition,
825            conflict_behavior,
826            version_column_indices,
827            read_prefix_len_hint: tb.read_prefix_len_hint as usize,
828            version: tb.version.map(TableVersion::from_prost),
829            watermark_columns,
830            dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
831            cardinality: tb
832                .cardinality
833                .map(|c| Cardinality::from_protobuf(&c))
834                .unwrap_or_else(Cardinality::unknown),
835            created_at_epoch: tb.created_at_epoch.map(Epoch::from),
836            initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
837            cleaned_by_watermark: tb.cleaned_by_watermark,
838            create_type: CreateType::from_proto(create_type),
839            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
840            description: tb.description,
841            created_at_cluster_version: tb.created_at_cluster_version.clone(),
842            initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
843            retention_seconds: tb.retention_seconds,
844            cdc_table_id: tb.cdc_table_id,
845            vnode_count,
846            webhook_info: tb.webhook_info,
847            job_id: tb.job_id,
848            engine,
849            clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
850
851            refreshable: tb.refreshable,
852            vector_index_info: tb.vector_index_info,
853            cdc_table_type: tb
854                .cdc_table_type
855                .and_then(|t| PbCdcTableType::try_from(t).ok())
856                .map(ExternalCdcTableType::from),
857        }
858    }
859}
860
861impl From<&PbTable> for TableCatalog {
862    fn from(tb: &PbTable) -> Self {
863        tb.clone().into()
864    }
865}
866
867impl OwnedByUserCatalog for TableCatalog {
868    fn owner(&self) -> UserId {
869        self.owner
870    }
871}
872
873#[cfg(test)]
874mod tests {
875    use risingwave_common::catalog::{ColumnDesc, ColumnId};
876    use risingwave_common::test_prelude::*;
877    use risingwave_common::types::*;
878    use risingwave_common::util::sort_util::OrderType;
879    use risingwave_pb::catalog::table::PbEngine;
880    use risingwave_pb::plan_common::{
881        AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
882    };
883
884    use super::*;
885
886    #[test]
887    fn test_into_table_catalog() {
888        let table: TableCatalog = PbTable {
889            id: 0.into(),
890            schema_id: 0.into(),
891            database_id: 0.into(),
892            name: "test".to_owned(),
893            table_type: PbTableType::Table as i32,
894            columns: vec![
895                ColumnCatalog::row_id_column().to_protobuf(),
896                PbColumnCatalog {
897                    column_desc: Some(PbColumnDesc::new(
898                        DataType::from(StructType::new([
899                            ("address", DataType::Varchar),
900                            ("zipcode", DataType::Varchar),
901                        ]))
902                        .to_protobuf(),
903                        "country",
904                        1,
905                    )),
906                    is_hidden: false,
907                },
908            ],
909            pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
910            stream_key: vec![0],
911            distribution_key: vec![0],
912            optional_associated_source_id: Some(SourceId::new(233).into()),
913            append_only: false,
914            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
915            retention_seconds: Some(300),
916            fragment_id: 0.into(),
917            dml_fragment_id: None,
918            initialized_at_epoch: None,
919            value_indices: vec![0],
920            definition: "".into(),
921            read_prefix_len_hint: 0,
922            vnode_col_index: None,
923            row_id_index: None,
924            version: Some(PbTableVersion {
925                version: 0,
926                next_column_id: 2,
927            }),
928            watermark_indices: vec![],
929            handle_pk_conflict_behavior: 3,
930            dist_key_in_pk: vec![0],
931            cardinality: None,
932            created_at_epoch: None,
933            cleaned_by_watermark: false,
934            stream_job_status: PbStreamJobStatus::Created.into(),
935            create_type: PbCreateType::Foreground.into(),
936            description: Some("description".to_owned()),
937            #[expect(deprecated)]
938            incoming_sinks: vec![],
939            created_at_cluster_version: None,
940            initialized_at_cluster_version: None,
941            version_column_indices: Vec::new(),
942            cdc_table_id: None,
943            maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
944            webhook_info: None,
945            job_id: None,
946            engine: Some(PbEngine::Hummock as i32),
947            clean_watermark_index_in_pk: None,
948
949            refreshable: false,
950            vector_index_info: None,
951            cdc_table_type: None,
952            refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
953        }
954        .into();
955
956        assert_eq!(
957            table,
958            TableCatalog {
959                id: TableId::new(0),
960                schema_id: 0.into(),
961                database_id: 0.into(),
962                associated_source_id: Some(SourceId::new(233)),
963                name: "test".to_owned(),
964                table_type: TableType::Table,
965                columns: vec![
966                    ColumnCatalog::row_id_column(),
967                    ColumnCatalog {
968                        column_desc: ColumnDesc {
969                            data_type: StructType::new(vec![
970                                ("address", DataType::Varchar),
971                                ("zipcode", DataType::Varchar)
972                            ],)
973                            .into(),
974                            column_id: ColumnId::new(1),
975                            name: "country".to_owned(),
976                            description: None,
977                            generated_or_default_column: None,
978                            additional_column: AdditionalColumn { column_type: None },
979                            version: ColumnDescVersion::LATEST,
980                            system_column: None,
981                            nullable: true,
982                        },
983                        is_hidden: false
984                    },
985                    ColumnCatalog::rw_timestamp_column(),
986                ],
987                stream_key: vec![0],
988                pk: vec![ColumnOrder::new(0, OrderType::ascending())],
989                distribution_key: vec![0],
990                append_only: false,
991                owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
992                retention_seconds: Some(300),
993                fragment_id: 0.into(),
994                dml_fragment_id: None,
995                vnode_col_index: None,
996                row_id_index: None,
997                value_indices: vec![0],
998                definition: "".into(),
999                conflict_behavior: ConflictBehavior::NoCheck,
1000                read_prefix_len_hint: 0,
1001                version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
1002                watermark_columns: FixedBitSet::with_capacity(3),
1003                dist_key_in_pk: vec![0],
1004                cardinality: Cardinality::unknown(),
1005                created_at_epoch: None,
1006                initialized_at_epoch: None,
1007                cleaned_by_watermark: false,
1008                stream_job_status: StreamJobStatus::Created,
1009                create_type: CreateType::Foreground,
1010                description: Some("description".to_owned()),
1011                created_at_cluster_version: None,
1012                initialized_at_cluster_version: None,
1013                version_column_indices: Vec::new(),
1014                cdc_table_id: None,
1015                vnode_count: VnodeCount::set(233),
1016                webhook_info: None,
1017                job_id: None,
1018                engine: Engine::Hummock,
1019                clean_watermark_index_in_pk: None,
1020
1021                refreshable: false,
1022                vector_index_info: None,
1023                cdc_table_type: None,
1024            }
1025        );
1026        assert_eq!(table, TableCatalog::from(table.to_prost()));
1027    }
1028}