risingwave_frontend/catalog/
table_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21    ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, Schema,
22    StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_connector::source::cdc::external::ExternalCdcTableType;
28use risingwave_pb::catalog::table::{
29    CdcTableType as PbCdcTableType, OptionalAssociatedSourceId, PbEngine, PbTableType,
30    PbTableVersion,
31};
32use risingwave_pb::catalog::{
33    PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
34};
35use risingwave_pb::common::PbColumnOrder;
36use risingwave_pb::id::JobId;
37use risingwave_pb::plan_common::DefaultColumnDesc;
38use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
39use risingwave_sqlparser::ast;
40use risingwave_sqlparser::parser::Parser;
41use thiserror_ext::AsReport as _;
42
43use super::purify::try_purify_table_source_create_sql_ast;
44use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId};
45use crate::error::{ErrorCode, Result, RwError};
46use crate::expr::ExprImpl;
47use crate::optimizer::property::Cardinality;
48use crate::session::current::notice_to_user;
49use crate::user::UserId;
50
51/// `TableCatalog` Includes full information about a table.
52///
53/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
54/// It is not the same as a user-side table created by `CREATE TABLE`.
55///
56/// Use [`Self::table_type()`] to determine the [`TableType`] of the table.
57///
58/// # Column ID & Column Index
59///
60/// [`ColumnId`](risingwave_common::catalog::ColumnId) (with type `i32`) is the unique identifier of
61/// a column in a table. It is used to access storage.
62///
63/// Column index, or idx, (with type `usize`) is the relative position inside the `Vec` of columns.
64///
65/// A tip to avoid making mistakes is never do casting - i32 as usize or vice versa.
66///
67/// # Keys
68///
69/// All the keys are represented as column indices.
70///
71/// - **Primary Key** (pk): unique identifier of a row.
72///
73/// - **Order Key**: the primary key for storage, used to sort and access data.
74///
75///   For an MV, the columns in `ORDER BY` clause will be put at the beginning of the order key. And
76///   the remaining columns in pk will follow behind.
77///
78///   If there's no `ORDER BY` clause, the order key will be the same as pk.
79///
80/// - **Distribution Key**: the columns used to partition the data. It must be a subset of the order
81///   key.
82#[derive(Clone, Debug, PartialEq, Eq, Hash)]
83#[cfg_attr(test, derive(Default))]
84pub struct TableCatalog {
85    pub id: TableId,
86
87    pub schema_id: SchemaId,
88
89    pub database_id: DatabaseId,
90
91    pub associated_source_id: Option<TableId>, // TODO: use SourceId
92
93    pub name: String,
94
95    /// All columns in this table.
96    pub columns: Vec<ColumnCatalog>,
97
98    /// Key used as materialize's storage key prefix, including MV order columns and `stream_key`.
99    pub pk: Vec<ColumnOrder>,
100
101    /// `pk_indices` of the corresponding materialize operator's output.
102    /// For the backward compatibility, we should use `stream_key()` method to get the stream key.
103    /// never use this field directly.
104    pub stream_key: Vec<usize>,
105
106    /// Type of the table. Used to distinguish user-created tables, materialized views, index
107    /// tables, and internal tables.
108    pub table_type: TableType,
109
110    /// Distribution key column indices.
111    pub distribution_key: Vec<usize>,
112
113    /// The append-only attribute is derived from `StreamMaterialize` and `StreamTableScan` relies
114    /// on this to derive an append-only stream plan.
115    pub append_only: bool,
116
117    /// The cardinality of the table.
118    pub cardinality: Cardinality,
119
120    /// Owner of the table.
121    pub owner: UserId,
122
123    // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
124    pub retention_seconds: Option<u32>,
125
126    /// The fragment id of the `Materialize` operator for this table.
127    pub fragment_id: FragmentId,
128
129    /// The fragment id of the `DML` operator for this table.
130    pub dml_fragment_id: Option<FragmentId>,
131
132    /// An optional column index which is the vnode of each row computed by the table's consistent
133    /// hash distribution.
134    pub vnode_col_index: Option<usize>,
135
136    /// An optional column index of row id. If the primary key is specified by users, this will be
137    /// `None`.
138    pub row_id_index: Option<usize>,
139
140    /// The column indices which are stored in the state store's value with row-encoding.
141    pub value_indices: Vec<usize>,
142
143    /// The full `CREATE TABLE` or `CREATE MATERIALIZED VIEW` definition of the table.
144    pub definition: String,
145
146    /// The behavior of handling incoming pk conflict from source executor, we can overwrite or
147    /// ignore conflict pk. For normal materialize executor and other executors, this field will be
148    /// `No Check`.
149    pub conflict_behavior: ConflictBehavior,
150
151    pub version_column_indices: Vec<usize>,
152
153    pub read_prefix_len_hint: usize,
154
155    /// Per-table catalog version, used by schema change. `None` for internal tables and tests.
156    pub version: Option<TableVersion>,
157
158    /// The column indices which could receive watermarks.
159    pub watermark_columns: FixedBitSet,
160
161    /// Optional field specifies the distribution key indices in pk.
162    /// See <https://github.com/risingwavelabs/risingwave/issues/8377> for more information.
163    pub dist_key_in_pk: Vec<usize>,
164
165    pub created_at_epoch: Option<Epoch>,
166
167    pub initialized_at_epoch: Option<Epoch>,
168
169    /// Indicate whether to use watermark cache for state table.
170    pub cleaned_by_watermark: bool,
171
172    /// Indicate whether to create table in background or foreground.
173    pub create_type: CreateType,
174
175    /// Indicate the stream job status, whether it is created or creating.
176    /// If it is creating, we should hide it.
177    pub stream_job_status: StreamJobStatus,
178
179    /// description of table, set by `comment on`.
180    pub description: Option<String>,
181
182    pub created_at_cluster_version: Option<String>,
183
184    pub initialized_at_cluster_version: Option<String>,
185
186    pub cdc_table_id: Option<String>,
187
188    /// Total vnode count of the table.
189    ///
190    /// Can be [`VnodeCount::Placeholder`] if the catalog is generated by the frontend and the
191    /// corresponding job is still in `Creating` status, in which case calling [`Self::vnode_count`]
192    /// will panic.
193    ///
194    /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog
195    /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build
196    pub vnode_count: VnodeCount,
197
198    pub webhook_info: Option<PbWebhookSourceInfo>,
199
200    pub job_id: Option<JobId>,
201
202    pub engine: Engine,
203
204    pub clean_watermark_index_in_pk: Option<usize>,
205
206    /// Whether the table supports manual refresh operations
207    pub refreshable: bool,
208
209    pub vector_index_info: Option<PbVectorIndexInfo>,
210
211    pub cdc_table_type: Option<ExternalCdcTableType>,
212}
213
214pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
215pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
216
217#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
218#[cfg_attr(test, derive(Default))]
219pub enum TableType {
220    /// Tables created by `CREATE TABLE`.
221    #[cfg_attr(test, default)]
222    Table,
223    /// Tables created by `CREATE MATERIALIZED VIEW`.
224    MaterializedView,
225    /// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`.
226    /// An index has both a `TableCatalog` and an `IndexCatalog`.
227    Index,
228    VectorIndex,
229    /// Internal tables for executors.
230    Internal,
231}
232
233impl TableType {
234    fn from_prost(prost: PbTableType) -> Self {
235        match prost {
236            PbTableType::Table => Self::Table,
237            PbTableType::MaterializedView => Self::MaterializedView,
238            PbTableType::Index => Self::Index,
239            PbTableType::Internal => Self::Internal,
240            PbTableType::VectorIndex => Self::VectorIndex,
241            PbTableType::Unspecified => unreachable!(),
242        }
243    }
244
245    pub(crate) fn to_prost(self) -> PbTableType {
246        match self {
247            Self::Table => PbTableType::Table,
248            Self::MaterializedView => PbTableType::MaterializedView,
249            Self::Index => PbTableType::Index,
250            Self::VectorIndex => PbTableType::VectorIndex,
251            Self::Internal => PbTableType::Internal,
252        }
253    }
254}
255
256/// The version of a table, used by schema change. See [`PbTableVersion`] for more details.
257#[derive(Clone, Debug, PartialEq, Eq, Hash)]
258pub struct TableVersion {
259    pub version_id: TableVersionId,
260    pub next_column_id: ColumnId,
261}
262
263impl TableVersion {
264    /// Create an initial version for a table, with the given max column id.
265    #[cfg(test)]
266    pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
267        use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
268
269        Self {
270            version_id: INITIAL_TABLE_VERSION_ID,
271            next_column_id: max_column_id.next(),
272        }
273    }
274
275    pub fn from_prost(prost: PbTableVersion) -> Self {
276        Self {
277            version_id: prost.version,
278            next_column_id: ColumnId::from(prost.next_column_id),
279        }
280    }
281
282    pub fn to_prost(&self) -> PbTableVersion {
283        PbTableVersion {
284            version: self.version_id,
285            next_column_id: self.next_column_id.into(),
286        }
287    }
288}
289
290impl TableCatalog {
291    /// Returns the SQL definition when the table was created, purified with best effort
292    /// if it's a table.
293    ///
294    /// See [`Self::create_sql_ast_purified`] for more details.
295    pub fn create_sql_purified(&self) -> String {
296        self.create_sql_ast_purified()
297            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
298            .unwrap_or_else(|_| self.create_sql())
299    }
300
301    /// Returns the parsed SQL definition when the table was created, purified with best effort
302    /// if it's a table.
303    ///
304    /// Returns error if it's invalid.
305    pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
306        // Purification is only applicable to tables.
307        if let TableType::Table = self.table_type() {
308            let base = if self.definition.is_empty() {
309                // Created by `CREATE TABLE AS`, create a skeleton `CREATE TABLE` statement.
310                let name = ast::ObjectName(vec![self.name.as_str().into()]);
311                ast::Statement::default_create_table(name)
312            } else {
313                self.create_sql_ast_from_persisted()?
314            };
315
316            match try_purify_table_source_create_sql_ast(
317                base,
318                self.columns(),
319                self.row_id_index,
320                &self.pk_column_ids(),
321            ) {
322                Ok(stmt) => return Ok(stmt),
323                Err(e) => notice_to_user(format!(
324                    "error occurred while purifying definition for table \"{}\", \
325                     results may be inaccurate: {}",
326                    self.name,
327                    e.as_report()
328                )),
329            }
330        }
331
332        self.create_sql_ast_from_persisted()
333    }
334}
335
336impl TableCatalog {
337    /// Get a reference to the table catalog's table id.
338    pub fn id(&self) -> TableId {
339        self.id
340    }
341
342    pub fn with_id(mut self, id: TableId) -> Self {
343        self.id = id;
344        self
345    }
346
347    pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
348        self.cleaned_by_watermark = cleaned_by_watermark;
349        self
350    }
351
352    pub fn conflict_behavior(&self) -> ConflictBehavior {
353        self.conflict_behavior
354    }
355
356    pub fn table_type(&self) -> TableType {
357        self.table_type
358    }
359
360    pub fn engine(&self) -> Engine {
361        self.engine
362    }
363
364    pub fn iceberg_source_name(&self) -> Option<String> {
365        match self.engine {
366            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
367            Engine::Hummock => None,
368        }
369    }
370
371    pub fn iceberg_sink_name(&self) -> Option<String> {
372        match self.engine {
373            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
374            Engine::Hummock => None,
375        }
376    }
377
378    pub fn is_user_table(&self) -> bool {
379        self.table_type == TableType::Table
380    }
381
382    pub fn is_internal_table(&self) -> bool {
383        self.table_type == TableType::Internal
384    }
385
386    pub fn is_mview(&self) -> bool {
387        self.table_type == TableType::MaterializedView
388    }
389
390    pub fn is_index(&self) -> bool {
391        self.table_type == TableType::Index
392    }
393
394    /// Returns an error if `DROP` statements are used on the wrong type of table.
395    #[must_use]
396    pub fn bad_drop_error(&self) -> RwError {
397        let msg = match self.table_type {
398            TableType::MaterializedView => {
399                "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
400            }
401            TableType::Index | TableType::VectorIndex => "Use `DROP INDEX` to drop an index.",
402            TableType::Table => "Use `DROP TABLE` to drop a table.",
403            TableType::Internal => "Internal tables cannot be dropped.",
404        };
405
406        ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
407    }
408
409    /// Get the table catalog's associated source id.
410    #[must_use]
411    pub fn associated_source_id(&self) -> Option<TableId> {
412        self.associated_source_id
413    }
414
415    pub fn has_associated_source(&self) -> bool {
416        self.associated_source_id.is_some()
417    }
418
419    /// Get a reference to the table catalog's columns.
420    pub fn columns(&self) -> &[ColumnCatalog] {
421        &self.columns
422    }
423
424    pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
425        self.columns
426            .iter()
427            .filter(|c| !c.is_rw_timestamp_column())
428            .cloned()
429            .collect()
430    }
431
432    /// Get a reference to the table catalog's pk desc.
433    pub fn pk(&self) -> &[ColumnOrder] {
434        self.pk.as_ref()
435    }
436
437    /// Get the column IDs of the primary key.
438    pub fn pk_column_ids(&self) -> Vec<ColumnId> {
439        self.pk
440            .iter()
441            .map(|x| self.columns[x.column_index].column_id())
442            .collect()
443    }
444
445    /// Get the column names of the primary key.
446    pub fn pk_column_names(&self) -> Vec<&str> {
447        self.pk
448            .iter()
449            .map(|x| self.columns[x.column_index].name())
450            .collect()
451    }
452
453    /// Derive the stream key, which must include all distribution keys.
454    /// For backward compatibility, if distribution key is not in stream key(e.g. indexes created in old versions),
455    /// we need to add them to stream key.
456    pub fn stream_key(&self) -> Vec<usize> {
457        // if distribution key is not in stream key, we need to add them to stream key
458        if self
459            .distribution_key
460            .iter()
461            .any(|dist_key| !self.stream_key.contains(dist_key))
462        {
463            let mut new_stream_key = self.distribution_key.clone();
464            let mut seen: HashSet<usize> = self.distribution_key.iter().copied().collect();
465            for &key in &self.stream_key {
466                if seen.insert(key) {
467                    new_stream_key.push(key);
468                }
469            }
470            new_stream_key
471        } else {
472            self.stream_key.clone()
473        }
474    }
475
476    /// Get a [`TableDesc`] of the table.
477    ///
478    /// Note: this must be called on existing tables, otherwise it will fail to get the vnode count
479    /// (which is determined by the meta service) and panic.
480    pub fn table_desc(&self) -> TableDesc {
481        use risingwave_common::catalog::TableOption;
482
483        let table_options = TableOption::new(self.retention_seconds);
484
485        TableDesc {
486            table_id: self.id,
487            pk: self.pk.clone(),
488            stream_key: self.stream_key(),
489            columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
490            distribution_key: self.distribution_key.clone(),
491            append_only: self.append_only,
492            retention_seconds: table_options.retention_seconds,
493            value_indices: self.value_indices.clone(),
494            read_prefix_len_hint: self.read_prefix_len_hint,
495            watermark_columns: self.watermark_columns.clone(),
496            versioned: self.version.is_some(),
497            vnode_col_index: self.vnode_col_index,
498            vnode_count: self.vnode_count(),
499        }
500    }
501
502    /// Get a reference to the table catalog's name.
503    pub fn name(&self) -> &str {
504        self.name.as_ref()
505    }
506
507    pub fn distribution_key(&self) -> &[usize] {
508        self.distribution_key.as_ref()
509    }
510
511    pub fn to_internal_table_prost(&self) -> PbTable {
512        self.to_prost()
513    }
514
515    /// Returns the SQL definition when the table was created.
516    ///
517    /// See [`Self::create_sql_ast`] for more details.
518    pub fn create_sql(&self) -> String {
519        self.create_sql_ast()
520            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
521            .unwrap_or_else(|_| self.definition.clone())
522    }
523
524    /// Returns the parsed SQL definition when the table was created.
525    ///
526    /// Re-create the table with this statement may have different schema if the schema is derived
527    /// from external systems (like schema registry) or it's created by `CREATE TABLE AS`. If this
528    /// is not desired, use [`Self::create_sql_ast_purified`] instead.
529    ///
530    /// Returns error if it's invalid.
531    pub fn create_sql_ast(&self) -> Result<ast::Statement> {
532        if let TableType::Table = self.table_type()
533            && self.definition.is_empty()
534        {
535            // Always fix definition for `CREATE TABLE AS`.
536            self.create_sql_ast_purified()
537        } else {
538            // Directly parse the persisted definition.
539            self.create_sql_ast_from_persisted()
540        }
541    }
542
543    fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
544        Ok(Parser::parse_exactly_one(&self.definition)?)
545    }
546
547    /// Get a reference to the table catalog's version.
548    pub fn version(&self) -> Option<&TableVersion> {
549        self.version.as_ref()
550    }
551
552    /// Get the table's version id. Returns `None` if the table has no version field.
553    pub fn version_id(&self) -> Option<TableVersionId> {
554        self.version().map(|v| v.version_id)
555    }
556
557    /// Get the total vnode count of the table.
558    pub fn vnode_count(&self) -> usize {
559        if self.id().is_placeholder() {
560            0
561        } else {
562            // Panics if it's called on an incomplete (and not yet persisted) table catalog.
563            self.vnode_count.value()
564        }
565    }
566
567    pub fn to_prost(&self) -> PbTable {
568        PbTable {
569            id: self.id,
570            schema_id: self.schema_id,
571            database_id: self.database_id,
572            name: self.name.clone(),
573            // ignore `_rw_timestamp` when serializing
574            columns: self
575                .columns_without_rw_timestamp()
576                .iter()
577                .map(|c| c.to_protobuf())
578                .collect(),
579            pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
580            stream_key: self.stream_key().iter().map(|x| *x as _).collect(),
581            optional_associated_source_id: self.associated_source_id.map(|source_id| {
582                OptionalAssociatedSourceId::AssociatedSourceId(source_id.as_raw_id())
583            }),
584            table_type: self.table_type.to_prost() as i32,
585            distribution_key: self
586                .distribution_key
587                .iter()
588                .map(|k| *k as i32)
589                .collect_vec(),
590            append_only: self.append_only,
591            owner: self.owner,
592            fragment_id: self.fragment_id,
593            dml_fragment_id: self.dml_fragment_id,
594            vnode_col_index: self.vnode_col_index.map(|i| i as _),
595            row_id_index: self.row_id_index.map(|i| i as _),
596            value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
597            definition: self.definition.clone(),
598            read_prefix_len_hint: self.read_prefix_len_hint as u32,
599            version: self.version.as_ref().map(TableVersion::to_prost),
600            watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
601            dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
602            handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
603            version_column_indices: self
604                .version_column_indices
605                .iter()
606                .map(|&idx| idx as u32)
607                .collect(),
608            cardinality: Some(self.cardinality.to_protobuf()),
609            initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
610            created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
611            cleaned_by_watermark: self.cleaned_by_watermark,
612            stream_job_status: self.stream_job_status.to_proto().into(),
613            create_type: self.create_type.to_proto().into(),
614            description: self.description.clone(),
615            #[expect(deprecated)]
616            incoming_sinks: vec![],
617            created_at_cluster_version: self.created_at_cluster_version.clone(),
618            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
619            retention_seconds: self.retention_seconds,
620            cdc_table_id: self.cdc_table_id.clone(),
621            maybe_vnode_count: self.vnode_count.to_protobuf(),
622            webhook_info: self.webhook_info.clone(),
623            job_id: self.job_id,
624            engine: Some(self.engine.to_protobuf().into()),
625            clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
626            refreshable: self.refreshable,
627            vector_index_info: self.vector_index_info,
628            cdc_table_type: self
629                .cdc_table_type
630                .clone()
631                .map(|t| PbCdcTableType::from(t) as i32),
632            refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
633        }
634    }
635
636    /// Get columns excluding hidden columns and generated columns.
637    pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
638        self.columns
639            .iter()
640            .filter(|c| !c.is_hidden() && !c.is_generated())
641    }
642
643    pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
644        self.columns
645            .iter()
646            .filter(|c| c.is_generated())
647            .map(|c| c.name())
648    }
649
650    pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
651        self.columns
652            .iter()
653            .enumerate()
654            .filter(|(_, c)| c.is_generated())
655            .map(|(i, _)| i)
656    }
657
658    pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
659        if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
660            .columns[col_idx]
661            .column_desc
662            .generated_or_default_column
663            .as_ref()
664        {
665            ExprImpl::from_expr_proto(expr.as_ref().unwrap())
666                .expect("expr in default columns corrupted")
667        } else {
668            ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
669        }
670    }
671
672    pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
673        columns
674            .iter()
675            .map(|c| {
676                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
677                    expr,
678                    ..
679                })) = c.column_desc.generated_or_default_column.as_ref()
680                {
681                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
682                        .expect("expr in default columns corrupted")
683                } else {
684                    ExprImpl::literal_null(c.data_type().clone())
685                }
686            })
687            .collect()
688    }
689
690    pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
691        self.columns.iter().enumerate().filter_map(|(i, c)| {
692            if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
693                expr, ..
694            })) = c.column_desc.generated_or_default_column.as_ref()
695            {
696                Some((
697                    i,
698                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
699                        .expect("expr in default columns corrupted"),
700                ))
701            } else {
702                None
703            }
704        })
705    }
706
707    pub fn has_generated_column(&self) -> bool {
708        self.columns.iter().any(|c| c.is_generated())
709    }
710
711    pub fn has_rw_timestamp_column(&self) -> bool {
712        self.columns.iter().any(|c| c.is_rw_timestamp_column())
713    }
714
715    pub fn column_schema(&self) -> Schema {
716        Schema::new(
717            self.columns
718                .iter()
719                .map(|c| Field::from(&c.column_desc))
720                .collect(),
721        )
722    }
723
724    pub fn is_created(&self) -> bool {
725        self.stream_job_status == StreamJobStatus::Created
726    }
727
728    pub fn is_iceberg_engine_table(&self) -> bool {
729        self.engine == Engine::Iceberg
730    }
731
732    pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
733        self.pk.iter().map(|col| col.column_index)
734    }
735
736    pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
737        ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
738    }
739
740    pub fn order_column_ids(&self) -> Vec<ColumnId> {
741        self.pk
742            .iter()
743            .map(|col| self.columns[col.column_index].column_desc.column_id)
744            .collect()
745    }
746
747    pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
748        // Set materialize key as arrange key + pk
749        self.pk.iter().map(|x| x.to_protobuf()).collect()
750    }
751}
752
753impl From<PbTable> for TableCatalog {
754    fn from(tb: PbTable) -> Self {
755        let id = tb.id;
756        let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
757        let tb_engine = tb
758            .get_engine()
759            .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
760            .unwrap_or(PbEngine::Hummock);
761        let table_type = tb.get_table_type().unwrap();
762        let stream_job_status = tb
763            .get_stream_job_status()
764            .unwrap_or(PbStreamJobStatus::Created);
765        let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
766        let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
767            OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
768        });
769        let name = tb.name.clone();
770
771        let vnode_count = tb.vnode_count_inner();
772        if let VnodeCount::Placeholder = vnode_count {
773            // Only allow placeholder vnode count for creating tables.
774            // After the table is created, an `Update` notification will be used to update the vnode count field.
775            assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
776        }
777
778        let mut col_names = HashSet::new();
779        let mut col_index: HashMap<i32, usize> = HashMap::new();
780
781        let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
782        let version_column_indices: Vec<usize> = tb
783            .version_column_indices
784            .iter()
785            .map(|&idx| idx as usize)
786            .collect();
787        let mut columns: Vec<ColumnCatalog> =
788            tb.columns.into_iter().map(ColumnCatalog::from).collect();
789        if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
790            // Add system column `_rw_timestamp` to every table, but notice that this column is never persisted.
791            columns.push(ColumnCatalog::rw_timestamp_column());
792        }
793        for (idx, catalog) in columns.clone().into_iter().enumerate() {
794            let col_name = catalog.name();
795            if !col_names.insert(col_name.to_owned()) {
796                panic!("duplicated column name {} in table {} ", col_name, tb.name)
797            }
798
799            let col_id = catalog.column_desc.column_id.get_id();
800            col_index.insert(col_id, idx);
801        }
802
803        let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
804        let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
805        for idx in &tb.watermark_indices {
806            watermark_columns.insert(*idx as _);
807        }
808        let engine = Engine::from_protobuf(&tb_engine);
809
810        Self {
811            id,
812            schema_id: tb.schema_id,
813            database_id: tb.database_id,
814            associated_source_id: associated_source_id.map(Into::into),
815            name,
816            pk,
817            columns,
818            table_type: TableType::from_prost(table_type),
819            distribution_key: tb
820                .distribution_key
821                .iter()
822                .map(|k| *k as usize)
823                .collect_vec(),
824            stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
825            append_only: tb.append_only,
826            owner: tb.owner,
827            fragment_id: tb.fragment_id,
828            dml_fragment_id: tb.dml_fragment_id,
829            vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
830            row_id_index: tb.row_id_index.map(|x| x as usize),
831            value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
832            definition: tb.definition,
833            conflict_behavior,
834            version_column_indices,
835            read_prefix_len_hint: tb.read_prefix_len_hint as usize,
836            version: tb.version.map(TableVersion::from_prost),
837            watermark_columns,
838            dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
839            cardinality: tb
840                .cardinality
841                .map(|c| Cardinality::from_protobuf(&c))
842                .unwrap_or_else(Cardinality::unknown),
843            created_at_epoch: tb.created_at_epoch.map(Epoch::from),
844            initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
845            cleaned_by_watermark: tb.cleaned_by_watermark,
846            create_type: CreateType::from_proto(create_type),
847            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
848            description: tb.description,
849            created_at_cluster_version: tb.created_at_cluster_version.clone(),
850            initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
851            retention_seconds: tb.retention_seconds,
852            cdc_table_id: tb.cdc_table_id,
853            vnode_count,
854            webhook_info: tb.webhook_info,
855            job_id: tb.job_id,
856            engine,
857            clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
858
859            refreshable: tb.refreshable,
860            vector_index_info: tb.vector_index_info,
861            cdc_table_type: tb
862                .cdc_table_type
863                .and_then(|t| PbCdcTableType::try_from(t).ok())
864                .map(ExternalCdcTableType::from),
865        }
866    }
867}
868
869impl From<&PbTable> for TableCatalog {
870    fn from(tb: &PbTable) -> Self {
871        tb.clone().into()
872    }
873}
874
875impl OwnedByUserCatalog for TableCatalog {
876    fn owner(&self) -> UserId {
877        self.owner
878    }
879}
880
881#[cfg(test)]
882mod tests {
883    use risingwave_common::catalog::{ColumnDesc, ColumnId};
884    use risingwave_common::test_prelude::*;
885    use risingwave_common::types::*;
886    use risingwave_common::util::sort_util::OrderType;
887    use risingwave_pb::catalog::table::PbEngine;
888    use risingwave_pb::plan_common::{
889        AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
890    };
891
892    use super::*;
893
894    #[test]
895    fn test_into_table_catalog() {
896        let table: TableCatalog = PbTable {
897            id: 0.into(),
898            schema_id: 0.into(),
899            database_id: 0.into(),
900            name: "test".to_owned(),
901            table_type: PbTableType::Table as i32,
902            columns: vec![
903                ColumnCatalog::row_id_column().to_protobuf(),
904                PbColumnCatalog {
905                    column_desc: Some(PbColumnDesc::new(
906                        DataType::from(StructType::new([
907                            ("address", DataType::Varchar),
908                            ("zipcode", DataType::Varchar),
909                        ]))
910                        .to_protobuf(),
911                        "country",
912                        1,
913                    )),
914                    is_hidden: false,
915                },
916            ],
917            pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
918            stream_key: vec![0],
919            distribution_key: vec![0],
920            optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233)
921                .into(),
922            append_only: false,
923            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
924            retention_seconds: Some(300),
925            fragment_id: 0,
926            dml_fragment_id: None,
927            initialized_at_epoch: None,
928            value_indices: vec![0],
929            definition: "".into(),
930            read_prefix_len_hint: 0,
931            vnode_col_index: None,
932            row_id_index: None,
933            version: Some(PbTableVersion {
934                version: 0,
935                next_column_id: 2,
936            }),
937            watermark_indices: vec![],
938            handle_pk_conflict_behavior: 3,
939            dist_key_in_pk: vec![0],
940            cardinality: None,
941            created_at_epoch: None,
942            cleaned_by_watermark: false,
943            stream_job_status: PbStreamJobStatus::Created.into(),
944            create_type: PbCreateType::Foreground.into(),
945            description: Some("description".to_owned()),
946            #[expect(deprecated)]
947            incoming_sinks: vec![],
948            created_at_cluster_version: None,
949            initialized_at_cluster_version: None,
950            version_column_indices: Vec::new(),
951            cdc_table_id: None,
952            maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
953            webhook_info: None,
954            job_id: None,
955            engine: Some(PbEngine::Hummock as i32),
956            clean_watermark_index_in_pk: None,
957
958            refreshable: false,
959            vector_index_info: None,
960            cdc_table_type: None,
961            refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
962        }
963        .into();
964
965        assert_eq!(
966            table,
967            TableCatalog {
968                id: TableId::new(0),
969                schema_id: 0.into(),
970                database_id: 0.into(),
971                associated_source_id: Some(TableId::new(233)),
972                name: "test".to_owned(),
973                table_type: TableType::Table,
974                columns: vec![
975                    ColumnCatalog::row_id_column(),
976                    ColumnCatalog {
977                        column_desc: ColumnDesc {
978                            data_type: StructType::new(vec![
979                                ("address", DataType::Varchar),
980                                ("zipcode", DataType::Varchar)
981                            ],)
982                            .into(),
983                            column_id: ColumnId::new(1),
984                            name: "country".to_owned(),
985                            description: None,
986                            generated_or_default_column: None,
987                            additional_column: AdditionalColumn { column_type: None },
988                            version: ColumnDescVersion::LATEST,
989                            system_column: None,
990                            nullable: true,
991                        },
992                        is_hidden: false
993                    },
994                    ColumnCatalog::rw_timestamp_column(),
995                ],
996                stream_key: vec![0],
997                pk: vec![ColumnOrder::new(0, OrderType::ascending())],
998                distribution_key: vec![0],
999                append_only: false,
1000                owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
1001                retention_seconds: Some(300),
1002                fragment_id: 0,
1003                dml_fragment_id: None,
1004                vnode_col_index: None,
1005                row_id_index: None,
1006                value_indices: vec![0],
1007                definition: "".into(),
1008                conflict_behavior: ConflictBehavior::NoCheck,
1009                read_prefix_len_hint: 0,
1010                version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
1011                watermark_columns: FixedBitSet::with_capacity(3),
1012                dist_key_in_pk: vec![0],
1013                cardinality: Cardinality::unknown(),
1014                created_at_epoch: None,
1015                initialized_at_epoch: None,
1016                cleaned_by_watermark: false,
1017                stream_job_status: StreamJobStatus::Created,
1018                create_type: CreateType::Foreground,
1019                description: Some("description".to_owned()),
1020                created_at_cluster_version: None,
1021                initialized_at_cluster_version: None,
1022                version_column_indices: Vec::new(),
1023                cdc_table_id: None,
1024                vnode_count: VnodeCount::set(233),
1025                webhook_info: None,
1026                job_id: None,
1027                engine: Engine::Hummock,
1028                clean_watermark_index_in_pk: None,
1029
1030                refreshable: false,
1031                vector_index_info: None,
1032                cdc_table_type: None,
1033            }
1034        );
1035        assert_eq!(table, TableCatalog::from(table.to_prost()));
1036    }
1037}