risingwave_frontend/catalog/
table_catalog.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21    ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, ICEBERG_SINK_PREFIX,
22    ICEBERG_SOURCE_PREFIX, Schema, StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::id::{JobId, SourceId};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_connector::source::cdc::external::ExternalCdcTableType;
29use risingwave_pb::catalog::table::{
30    CdcTableType as PbCdcTableType, PbEngine, PbTableType, PbTableVersion,
31};
32use risingwave_pb::catalog::{
33    PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
34};
35use risingwave_pb::common::PbColumnOrder;
36use risingwave_pb::plan_common::DefaultColumnDesc;
37use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
38use risingwave_sqlparser::ast;
39use risingwave_sqlparser::parser::Parser;
40use thiserror_ext::AsReport as _;
41
42use super::purify::try_purify_table_source_create_sql_ast;
43use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId};
44use crate::error::{ErrorCode, Result, RwError};
45use crate::expr::ExprImpl;
46use crate::optimizer::property::Cardinality;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50/// `TableCatalog` Includes full information about a table.
51///
52/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
53/// It is not the same as a user-side table created by `CREATE TABLE`.
54///
55/// Use [`Self::table_type()`] to determine the [`TableType`] of the table.
56///
57/// # Column ID & Column Index
58///
59/// [`ColumnId`](risingwave_common::catalog::ColumnId) (with type `i32`) is the unique identifier of
60/// a column in a table. It is used to access storage.
61///
62/// Column index, or idx, (with type `usize`) is the relative position inside the `Vec` of columns.
63///
64/// A tip to avoid making mistakes is never do casting - i32 as usize or vice versa.
65///
66/// # Keys
67///
68/// All the keys are represented as column indices.
69///
70/// - **Primary Key** (pk): unique identifier of a row.
71///
72/// - **Order Key**: the primary key for storage, used to sort and access data.
73///
74///   For an MV, the columns in `ORDER BY` clause will be put at the beginning of the order key. And
75///   the remaining columns in pk will follow behind.
76///
77///   If there's no `ORDER BY` clause, the order key will be the same as pk.
78///
79/// - **Distribution Key**: the columns used to partition the data. It must be a subset of the order
80///   key.
81#[derive(Clone, Debug, PartialEq, Eq, Hash)]
82#[cfg_attr(test, derive(Default))]
83pub struct TableCatalog {
84    pub id: TableId,
85
86    pub schema_id: SchemaId,
87
88    pub database_id: DatabaseId,
89
90    pub associated_source_id: Option<SourceId>, // TODO: use SourceId
91
92    pub name: String,
93
94    /// All columns in this table.
95    pub columns: Vec<ColumnCatalog>,
96
97    /// Key used as materialize's storage key prefix, including MV order columns and `stream_key`.
98    pub pk: Vec<ColumnOrder>,
99
100    /// `pk_indices` of the corresponding materialize operator's output.
101    /// For the backward compatibility, we should use `stream_key()` method to get the stream key.
102    /// never use this field directly.
103    pub stream_key: Vec<usize>,
104
105    /// Type of the table. Used to distinguish user-created tables, materialized views, index
106    /// tables, and internal tables.
107    pub table_type: TableType,
108
109    /// Distribution key column indices.
110    pub distribution_key: Vec<usize>,
111
112    /// The append-only attribute is derived from `StreamMaterialize` and `StreamTableScan` relies
113    /// on this to derive an append-only stream plan.
114    pub append_only: bool,
115
116    /// The cardinality of the table.
117    pub cardinality: Cardinality,
118
119    /// Owner of the table.
120    pub owner: UserId,
121
122    // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
123    pub retention_seconds: Option<u32>,
124
125    /// The fragment id of the `Materialize` operator for this table.
126    pub fragment_id: FragmentId,
127
128    /// The fragment id of the `DML` operator for this table.
129    pub dml_fragment_id: Option<FragmentId>,
130
131    /// An optional column index which is the vnode of each row computed by the table's consistent
132    /// hash distribution.
133    pub vnode_col_index: Option<usize>,
134
135    /// An optional column index of row id. If the primary key is specified by users, this will be
136    /// `None`.
137    pub row_id_index: Option<usize>,
138
139    /// The column indices which are stored in the state store's value with row-encoding.
140    pub value_indices: Vec<usize>,
141
142    /// The full `CREATE TABLE` or `CREATE MATERIALIZED VIEW` definition of the table.
143    pub definition: String,
144
145    /// The behavior of handling incoming pk conflict from source executor, we can overwrite or
146    /// ignore conflict pk. For normal materialize executor and other executors, this field will be
147    /// `No Check`.
148    pub conflict_behavior: ConflictBehavior,
149
150    pub version_column_indices: Vec<usize>,
151
152    pub read_prefix_len_hint: usize,
153
154    /// Per-table catalog version, used by schema change. `None` for internal tables and tests.
155    pub version: Option<TableVersion>,
156
157    /// The column indices which could receive watermarks.
158    pub watermark_columns: FixedBitSet,
159
160    /// Optional field specifies the distribution key indices in pk.
161    /// See <https://github.com/risingwavelabs/risingwave/issues/8377> for more information.
162    pub dist_key_in_pk: Vec<usize>,
163
164    pub created_at_epoch: Option<Epoch>,
165
166    pub initialized_at_epoch: Option<Epoch>,
167
168    /// Indicate whether to create table in background or foreground.
169    pub create_type: CreateType,
170
171    /// Indicate the stream job status, whether it is created or creating.
172    /// If it is creating, we should hide it.
173    pub stream_job_status: StreamJobStatus,
174
175    /// description of table, set by `comment on`.
176    pub description: Option<String>,
177
178    pub created_at_cluster_version: Option<String>,
179
180    pub initialized_at_cluster_version: Option<String>,
181
182    pub cdc_table_id: Option<String>,
183
184    /// Total vnode count of the table.
185    ///
186    /// Can be [`VnodeCount::Placeholder`] if the catalog is generated by the frontend and the
187    /// corresponding job is still in `Creating` status, in which case calling [`Self::vnode_count`]
188    /// will panic.
189    ///
190    /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog
191    /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build
192    pub vnode_count: VnodeCount,
193
194    pub webhook_info: Option<PbWebhookSourceInfo>,
195
196    pub job_id: Option<JobId>,
197
198    pub engine: Engine,
199
200    pub clean_watermark_index_in_pk: Option<usize>,
201
202    /// Indices of watermark columns in all columns that should be used for state cleaning.
203    /// This replaces `clean_watermark_index_in_pk` but is kept for backward compatibility.
204    /// When set, this takes precedence over `clean_watermark_index_in_pk`.
205    pub clean_watermark_indices: Vec<usize>,
206
207    /// Whether the table supports manual refresh operations
208    pub refreshable: bool,
209
210    pub vector_index_info: Option<PbVectorIndexInfo>,
211
212    pub cdc_table_type: Option<ExternalCdcTableType>,
213}
214
215#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
216#[cfg_attr(test, derive(Default))]
217pub enum TableType {
218    /// Tables created by `CREATE TABLE`.
219    #[cfg_attr(test, default)]
220    Table,
221    /// Tables created by `CREATE MATERIALIZED VIEW`.
222    MaterializedView,
223    /// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`.
224    /// An index has both a `TableCatalog` and an `IndexCatalog`.
225    Index,
226    VectorIndex,
227    /// Internal tables for executors.
228    Internal,
229}
230
231impl TableType {
232    fn from_prost(prost: PbTableType) -> Self {
233        match prost {
234            PbTableType::Table => Self::Table,
235            PbTableType::MaterializedView => Self::MaterializedView,
236            PbTableType::Index => Self::Index,
237            PbTableType::Internal => Self::Internal,
238            PbTableType::VectorIndex => Self::VectorIndex,
239            PbTableType::Unspecified => unreachable!(),
240        }
241    }
242
243    pub(crate) fn to_prost(self) -> PbTableType {
244        match self {
245            Self::Table => PbTableType::Table,
246            Self::MaterializedView => PbTableType::MaterializedView,
247            Self::Index => PbTableType::Index,
248            Self::VectorIndex => PbTableType::VectorIndex,
249            Self::Internal => PbTableType::Internal,
250        }
251    }
252}
253
254/// The version of a table, used by schema change. See [`PbTableVersion`] for more details.
255#[derive(Clone, Debug, PartialEq, Eq, Hash)]
256pub struct TableVersion {
257    pub version_id: TableVersionId,
258    pub next_column_id: ColumnId,
259}
260
261impl TableVersion {
262    /// Create an initial version for a table, with the given max column id.
263    #[cfg(test)]
264    pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
265        use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
266
267        Self {
268            version_id: INITIAL_TABLE_VERSION_ID,
269            next_column_id: max_column_id.next(),
270        }
271    }
272
273    pub fn from_prost(prost: PbTableVersion) -> Self {
274        Self {
275            version_id: prost.version,
276            next_column_id: ColumnId::from(prost.next_column_id),
277        }
278    }
279
280    pub fn to_prost(&self) -> PbTableVersion {
281        PbTableVersion {
282            version: self.version_id,
283            next_column_id: self.next_column_id.into(),
284        }
285    }
286}
287
288impl TableCatalog {
289    /// Returns the SQL definition when the table was created, purified with best effort
290    /// if it's a table.
291    ///
292    /// See [`Self::create_sql_ast_purified`] for more details.
293    pub fn create_sql_purified(&self) -> String {
294        self.create_sql_ast_purified()
295            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
296            .unwrap_or_else(|_| self.create_sql())
297    }
298
299    /// Returns the parsed SQL definition when the table was created, purified with best effort
300    /// if it's a table.
301    ///
302    /// Returns error if it's invalid.
303    pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
304        // Purification is only applicable to tables.
305        if let TableType::Table = self.table_type() {
306            let base = if self.definition.is_empty() {
307                // Created by `CREATE TABLE AS`, create a skeleton `CREATE TABLE` statement.
308                let name = ast::ObjectName(vec![self.name.as_str().into()]);
309                ast::Statement::default_create_table(name)
310            } else {
311                self.create_sql_ast_from_persisted()?
312            };
313
314            match try_purify_table_source_create_sql_ast(
315                base,
316                self.columns(),
317                self.row_id_index,
318                &self.pk_column_ids(),
319            ) {
320                Ok(stmt) => return Ok(stmt),
321                Err(e) => notice_to_user(format!(
322                    "error occurred while purifying definition for table \"{}\", \
323                     results may be inaccurate: {}",
324                    self.name,
325                    e.as_report()
326                )),
327            }
328        }
329
330        self.create_sql_ast_from_persisted()
331    }
332}
333
334impl TableCatalog {
335    /// Get a reference to the table catalog's table id.
336    pub fn id(&self) -> TableId {
337        self.id
338    }
339
340    pub fn with_id(mut self, id: TableId) -> Self {
341        self.id = id;
342        self
343    }
344
345    pub fn conflict_behavior(&self) -> ConflictBehavior {
346        self.conflict_behavior
347    }
348
349    pub fn table_type(&self) -> TableType {
350        self.table_type
351    }
352
353    pub fn engine(&self) -> Engine {
354        self.engine
355    }
356
357    pub fn iceberg_source_name(&self) -> Option<String> {
358        match self.engine {
359            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
360            Engine::Hummock => None,
361        }
362    }
363
364    pub fn iceberg_sink_name(&self) -> Option<String> {
365        match self.engine {
366            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
367            Engine::Hummock => None,
368        }
369    }
370
371    pub fn is_user_table(&self) -> bool {
372        self.table_type == TableType::Table
373    }
374
375    pub fn is_internal_table(&self) -> bool {
376        self.table_type == TableType::Internal
377    }
378
379    pub fn is_mview(&self) -> bool {
380        self.table_type == TableType::MaterializedView
381    }
382
383    pub fn is_index(&self) -> bool {
384        self.table_type == TableType::Index
385    }
386
387    /// Returns an error if `DROP` statements are used on the wrong type of table.
388    #[must_use]
389    pub fn bad_drop_error(&self) -> RwError {
390        let msg = match self.table_type {
391            TableType::MaterializedView => {
392                "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
393            }
394            TableType::Index | TableType::VectorIndex => "Use `DROP INDEX` to drop an index.",
395            TableType::Table => "Use `DROP TABLE` to drop a table.",
396            TableType::Internal => "Internal tables cannot be dropped.",
397        };
398
399        ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
400    }
401
402    /// Get the table catalog's associated source id.
403    #[must_use]
404    pub fn associated_source_id(&self) -> Option<SourceId> {
405        self.associated_source_id
406    }
407
408    pub fn has_associated_source(&self) -> bool {
409        self.associated_source_id.is_some()
410    }
411
412    /// Get a reference to the table catalog's columns.
413    pub fn columns(&self) -> &[ColumnCatalog] {
414        &self.columns
415    }
416
417    pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
418        self.columns
419            .iter()
420            .filter(|c| !c.is_rw_timestamp_column())
421            .cloned()
422            .collect()
423    }
424
425    /// Get a reference to the table catalog's pk desc.
426    pub fn pk(&self) -> &[ColumnOrder] {
427        self.pk.as_ref()
428    }
429
430    /// Get the column IDs of the primary key.
431    pub fn pk_column_ids(&self) -> Vec<ColumnId> {
432        self.pk
433            .iter()
434            .map(|x| self.columns[x.column_index].column_id())
435            .collect()
436    }
437
438    /// Get the column names of the primary key.
439    pub fn pk_column_names(&self) -> Vec<&str> {
440        self.pk
441            .iter()
442            .map(|x| self.columns[x.column_index].name())
443            .collect()
444    }
445
446    /// Derive the stream key, which must include all distribution keys.
447    /// For backward compatibility, if distribution key is not in stream key(e.g. indexes created in old versions),
448    /// we need to add them to stream key.
449    pub fn stream_key(&self) -> Vec<usize> {
450        // if distribution key is not in stream key, we need to add them to stream key
451        if self
452            .distribution_key
453            .iter()
454            .any(|dist_key| !self.stream_key.contains(dist_key))
455        {
456            let mut new_stream_key = self.distribution_key.clone();
457            let mut seen: HashSet<usize> = self.distribution_key.iter().copied().collect();
458            for &key in &self.stream_key {
459                if seen.insert(key) {
460                    new_stream_key.push(key);
461                }
462            }
463            new_stream_key
464        } else {
465            self.stream_key.clone()
466        }
467    }
468
469    /// Get a [`TableDesc`] of the table.
470    ///
471    /// Note: this must be called on existing tables, otherwise it will fail to get the vnode count
472    /// (which is determined by the meta service) and panic.
473    pub fn table_desc(&self) -> TableDesc {
474        use risingwave_common::catalog::TableOption;
475
476        let table_options = TableOption::new(self.retention_seconds);
477
478        TableDesc {
479            table_id: self.id,
480            pk: self.pk.clone(),
481            stream_key: self.stream_key(),
482            columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
483            distribution_key: self.distribution_key.clone(),
484            append_only: self.append_only,
485            retention_seconds: table_options.retention_seconds,
486            value_indices: self.value_indices.clone(),
487            read_prefix_len_hint: self.read_prefix_len_hint,
488            watermark_columns: self.watermark_columns.clone(),
489            versioned: self.version.is_some(),
490            vnode_col_index: self.vnode_col_index,
491            vnode_count: self.vnode_count(),
492        }
493    }
494
495    /// Get a reference to the table catalog's name.
496    pub fn name(&self) -> &str {
497        self.name.as_ref()
498    }
499
500    pub fn distribution_key(&self) -> &[usize] {
501        self.distribution_key.as_ref()
502    }
503
504    pub fn to_internal_table_prost(&self) -> PbTable {
505        self.to_prost()
506    }
507
508    /// Returns the SQL definition when the table was created.
509    ///
510    /// See [`Self::create_sql_ast`] for more details.
511    pub fn create_sql(&self) -> String {
512        self.create_sql_ast()
513            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
514            .unwrap_or_else(|_| self.definition.clone())
515    }
516
517    /// Returns the parsed SQL definition when the table was created.
518    ///
519    /// Re-create the table with this statement may have different schema if the schema is derived
520    /// from external systems (like schema registry) or it's created by `CREATE TABLE AS`. If this
521    /// is not desired, use [`Self::create_sql_ast_purified`] instead.
522    ///
523    /// Returns error if it's invalid.
524    pub fn create_sql_ast(&self) -> Result<ast::Statement> {
525        if let TableType::Table = self.table_type()
526            && self.definition.is_empty()
527        {
528            // Always fix definition for `CREATE TABLE AS`.
529            self.create_sql_ast_purified()
530        } else {
531            // Directly parse the persisted definition.
532            self.create_sql_ast_from_persisted()
533        }
534    }
535
536    fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
537        Ok(Parser::parse_exactly_one(&self.definition)?)
538    }
539
540    /// Get a reference to the table catalog's version.
541    pub fn version(&self) -> Option<&TableVersion> {
542        self.version.as_ref()
543    }
544
545    /// Get the table's version id. Returns `None` if the table has no version field.
546    pub fn version_id(&self) -> Option<TableVersionId> {
547        self.version().map(|v| v.version_id)
548    }
549
550    /// Get the total vnode count of the table.
551    pub fn vnode_count(&self) -> usize {
552        if self.id().is_placeholder() {
553            0
554        } else {
555            // Panics if it's called on an incomplete (and not yet persisted) table catalog.
556            self.vnode_count.value()
557        }
558    }
559
560    pub fn to_prost(&self) -> PbTable {
561        PbTable {
562            id: self.id,
563            schema_id: self.schema_id,
564            database_id: self.database_id,
565            name: self.name.clone(),
566            // ignore `_rw_timestamp` when serializing
567            columns: self
568                .columns_without_rw_timestamp()
569                .iter()
570                .map(|c| c.to_protobuf())
571                .collect(),
572            pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
573            stream_key: self.stream_key().iter().map(|x| *x as _).collect(),
574            optional_associated_source_id: self.associated_source_id.map(Into::into),
575            table_type: self.table_type.to_prost() as i32,
576            distribution_key: self
577                .distribution_key
578                .iter()
579                .map(|k| *k as i32)
580                .collect_vec(),
581            append_only: self.append_only,
582            owner: self.owner,
583            fragment_id: self.fragment_id,
584            dml_fragment_id: self.dml_fragment_id,
585            vnode_col_index: self.vnode_col_index.map(|i| i as _),
586            row_id_index: self.row_id_index.map(|i| i as _),
587            value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
588            definition: self.definition.clone(),
589            read_prefix_len_hint: self.read_prefix_len_hint as u32,
590            version: self.version.as_ref().map(TableVersion::to_prost),
591            watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
592            dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
593            handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
594            version_column_indices: self
595                .version_column_indices
596                .iter()
597                .map(|&idx| idx as u32)
598                .collect(),
599            cardinality: Some(self.cardinality.to_protobuf()),
600            initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
601            created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
602            #[expect(deprecated)]
603            cleaned_by_watermark: false,
604            stream_job_status: self.stream_job_status.to_proto().into(),
605            create_type: self.create_type.to_proto().into(),
606            description: self.description.clone(),
607            #[expect(deprecated)]
608            incoming_sinks: vec![],
609            created_at_cluster_version: self.created_at_cluster_version.clone(),
610            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
611            retention_seconds: self.retention_seconds,
612            cdc_table_id: self.cdc_table_id.clone(),
613            maybe_vnode_count: self.vnode_count.to_protobuf(),
614            webhook_info: self.webhook_info.clone(),
615            job_id: self.job_id,
616            engine: Some(self.engine.to_protobuf().into()),
617            #[expect(deprecated)]
618            clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
619            clean_watermark_indices: self
620                .clean_watermark_indices
621                .iter()
622                .map(|&x| x as u32)
623                .collect(),
624            refreshable: self.refreshable,
625            vector_index_info: self.vector_index_info,
626            cdc_table_type: self
627                .cdc_table_type
628                .clone()
629                .map(|t| PbCdcTableType::from(t) as i32),
630        }
631    }
632
633    /// Get insertable columns together with the shifted `row_id_index` used by DML.
634    ///
635    /// The returned columns exclude hidden columns and generated columns. The shifted
636    /// `row_id_index` rules out generated columns before the row-id column, matching the DML
637    /// input schema.
638    pub fn columns_to_insert(
639        &self,
640    ) -> (
641        impl Iterator<Item = (&ColumnCatalog, bool)> + '_,
642        Option<usize>,
643    ) {
644        let pk_column_indices: HashSet<_> =
645            self.pk.iter().map(|column| column.column_index).collect();
646        let columns = self
647            .columns
648            .iter()
649            .enumerate()
650            .filter(|(_, c)| !c.is_hidden() && !c.is_generated())
651            .map(move |(idx, column)| (column, pk_column_indices.contains(&idx)));
652        let row_id_index = self.row_id_index.map(|row_id_index| {
653            let generated_column_count = self
654                .columns()
655                .iter()
656                .take(row_id_index + 1)
657                .filter(|column| column.is_generated())
658                .count();
659            row_id_index - generated_column_count
660        });
661        (columns, row_id_index)
662    }
663
664    pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
665        self.columns
666            .iter()
667            .filter(|c| c.is_generated())
668            .map(|c| c.name())
669    }
670
671    pub fn first_generated_column(&self) -> Option<(usize, &str)> {
672        self.columns
673            .iter()
674            .filter(|c| !c.is_hidden())
675            .enumerate()
676            .find(|(_, c)| c.is_generated())
677            .map(|(idx, c)| (idx, c.name()))
678    }
679
680    pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
681        self.columns
682            .iter()
683            .enumerate()
684            .filter(|(_, c)| c.is_generated())
685            .map(|(i, _)| i)
686    }
687
688    pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
689        if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
690            .columns[col_idx]
691            .column_desc
692            .generated_or_default_column
693            .as_ref()
694        {
695            ExprImpl::from_expr_proto(expr.as_ref().unwrap())
696                .expect("expr in default columns corrupted")
697        } else {
698            ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
699        }
700    }
701
702    pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
703        columns
704            .iter()
705            .map(|c| {
706                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
707                    expr,
708                    ..
709                })) = c.column_desc.generated_or_default_column.as_ref()
710                {
711                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
712                        .expect("expr in default columns corrupted")
713                } else {
714                    ExprImpl::literal_null(c.data_type().clone())
715                }
716            })
717            .collect()
718    }
719
720    pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
721        self.columns.iter().enumerate().filter_map(|(i, c)| {
722            if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
723                expr, ..
724            })) = c.column_desc.generated_or_default_column.as_ref()
725            {
726                Some((
727                    i,
728                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
729                        .expect("expr in default columns corrupted"),
730                ))
731            } else {
732                None
733            }
734        })
735    }
736
737    pub fn has_generated_column(&self) -> bool {
738        self.columns.iter().any(|c| c.is_generated())
739    }
740
741    pub fn has_rw_timestamp_column(&self) -> bool {
742        self.columns.iter().any(|c| c.is_rw_timestamp_column())
743    }
744
745    pub fn column_schema(&self) -> Schema {
746        Schema::new(
747            self.columns
748                .iter()
749                .map(|c| Field::from(&c.column_desc))
750                .collect(),
751        )
752    }
753
754    pub fn is_created(&self) -> bool {
755        self.stream_job_status == StreamJobStatus::Created
756    }
757
758    pub fn is_iceberg_engine_table(&self) -> bool {
759        self.engine == Engine::Iceberg
760    }
761
762    pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
763        self.pk.iter().map(|col| col.column_index)
764    }
765
766    pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
767        ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
768    }
769
770    pub fn order_column_ids(&self) -> Vec<ColumnId> {
771        self.pk
772            .iter()
773            .map(|col| self.columns[col.column_index].column_desc.column_id)
774            .collect()
775    }
776
777    pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
778        // Set materialize key as arrange key + pk
779        self.pk.iter().map(|x| x.to_protobuf()).collect()
780    }
781}
782
783impl From<PbTable> for TableCatalog {
784    fn from(tb: PbTable) -> Self {
785        let id = tb.id;
786        let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
787        let tb_engine = tb
788            .get_engine()
789            .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
790            .unwrap_or(PbEngine::Hummock);
791        let table_type = tb.get_table_type().unwrap();
792        let stream_job_status = tb
793            .get_stream_job_status()
794            .unwrap_or(PbStreamJobStatus::Created);
795        let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
796        let associated_source_id = tb.optional_associated_source_id.map(Into::into);
797        let name = tb.name.clone();
798
799        let vnode_count = tb.vnode_count_inner();
800        if let VnodeCount::Placeholder = vnode_count {
801            // Only allow placeholder vnode count for creating tables.
802            // After the table is created, an `Update` notification will be used to update the vnode count field.
803            assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
804        }
805
806        let mut col_names = HashSet::new();
807        let mut col_index: HashMap<i32, usize> = HashMap::new();
808
809        let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
810        let version_column_indices: Vec<usize> = tb
811            .version_column_indices
812            .iter()
813            .map(|&idx| idx as usize)
814            .collect();
815        let mut columns: Vec<ColumnCatalog> =
816            tb.columns.into_iter().map(ColumnCatalog::from).collect();
817        if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
818            // Add system column `_rw_timestamp` to every table, but notice that this column is never persisted.
819            columns.push(ColumnCatalog::rw_timestamp_column());
820        }
821        for (idx, catalog) in columns.clone().into_iter().enumerate() {
822            let col_name = catalog.name();
823            if !col_names.insert(col_name.to_owned()) {
824                panic!("duplicated column name {} in table {} ", col_name, tb.name)
825            }
826
827            let col_id = catalog.column_desc.column_id.get_id();
828            col_index.insert(col_id, idx);
829        }
830
831        let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
832        let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
833        for idx in &tb.watermark_indices {
834            watermark_columns.insert(*idx as _);
835        }
836        let engine = Engine::from_protobuf(&tb_engine);
837
838        Self {
839            id,
840            schema_id: tb.schema_id,
841            database_id: tb.database_id,
842            associated_source_id,
843            name,
844            pk,
845            columns,
846            table_type: TableType::from_prost(table_type),
847            distribution_key: tb
848                .distribution_key
849                .iter()
850                .map(|k| *k as usize)
851                .collect_vec(),
852            stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
853            append_only: tb.append_only,
854            owner: tb.owner,
855            fragment_id: tb.fragment_id,
856            dml_fragment_id: tb.dml_fragment_id,
857            vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
858            row_id_index: tb.row_id_index.map(|x| x as usize),
859            value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
860            definition: tb.definition,
861            conflict_behavior,
862            version_column_indices,
863            read_prefix_len_hint: tb.read_prefix_len_hint as usize,
864            version: tb.version.map(TableVersion::from_prost),
865            watermark_columns,
866            dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
867            cardinality: tb
868                .cardinality
869                .map(|c| Cardinality::from_protobuf(&c))
870                .unwrap_or_else(Cardinality::unknown),
871            created_at_epoch: tb.created_at_epoch.map(Epoch::from),
872            initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
873            create_type: CreateType::from_proto(create_type),
874            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
875            description: tb.description,
876            created_at_cluster_version: tb.created_at_cluster_version.clone(),
877            initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
878            retention_seconds: tb.retention_seconds,
879            cdc_table_id: tb.cdc_table_id,
880            vnode_count,
881            webhook_info: tb.webhook_info,
882            job_id: tb.job_id,
883            engine,
884            #[expect(deprecated)]
885            clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
886            clean_watermark_indices: tb
887                .clean_watermark_indices
888                .iter()
889                .map(|&x| x as usize)
890                .collect(),
891
892            refreshable: tb.refreshable,
893            vector_index_info: tb.vector_index_info,
894            cdc_table_type: tb
895                .cdc_table_type
896                .and_then(|t| PbCdcTableType::try_from(t).ok())
897                .map(ExternalCdcTableType::from),
898        }
899    }
900}
901
902impl From<&PbTable> for TableCatalog {
903    fn from(tb: &PbTable) -> Self {
904        tb.clone().into()
905    }
906}
907
908impl OwnedByUserCatalog for TableCatalog {
909    fn owner(&self) -> UserId {
910        self.owner
911    }
912}
913
914#[cfg(test)]
915mod tests {
916    use risingwave_common::catalog::{ColumnDesc, ColumnId};
917    use risingwave_common::test_prelude::*;
918    use risingwave_common::types::*;
919    use risingwave_common::util::sort_util::OrderType;
920    use risingwave_pb::catalog::table::PbEngine;
921    use risingwave_pb::plan_common::{
922        AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
923    };
924
925    use super::*;
926
927    #[test]
928    fn test_into_table_catalog() {
929        let table: TableCatalog = PbTable {
930            id: 0.into(),
931            schema_id: 0.into(),
932            database_id: 0.into(),
933            name: "test".to_owned(),
934            table_type: PbTableType::Table as i32,
935            columns: vec![
936                ColumnCatalog::row_id_column().to_protobuf(),
937                PbColumnCatalog {
938                    column_desc: Some(PbColumnDesc::new(
939                        DataType::from(StructType::new([
940                            ("address", DataType::Varchar),
941                            ("zipcode", DataType::Varchar),
942                        ]))
943                        .to_protobuf(),
944                        "country",
945                        1,
946                    )),
947                    is_hidden: false,
948                },
949            ],
950            pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
951            stream_key: vec![0],
952            distribution_key: vec![0],
953            optional_associated_source_id: Some(SourceId::new(233).into()),
954            append_only: false,
955            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
956            retention_seconds: Some(300),
957            fragment_id: 0.into(),
958            dml_fragment_id: None,
959            initialized_at_epoch: None,
960            value_indices: vec![0],
961            definition: "".into(),
962            read_prefix_len_hint: 0,
963            vnode_col_index: None,
964            row_id_index: None,
965            version: Some(PbTableVersion {
966                version: 0,
967                next_column_id: 2,
968            }),
969            watermark_indices: vec![],
970            handle_pk_conflict_behavior: 3,
971            dist_key_in_pk: vec![0],
972            cardinality: None,
973            created_at_epoch: None,
974            #[expect(deprecated)]
975            cleaned_by_watermark: false,
976            stream_job_status: PbStreamJobStatus::Created.into(),
977            create_type: PbCreateType::Foreground.into(),
978            description: Some("description".to_owned()),
979            #[expect(deprecated)]
980            incoming_sinks: vec![],
981            created_at_cluster_version: None,
982            initialized_at_cluster_version: None,
983            version_column_indices: Vec::new(),
984            cdc_table_id: None,
985            maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
986            webhook_info: None,
987            job_id: None,
988            engine: Some(PbEngine::Hummock as i32),
989            #[expect(deprecated)]
990            clean_watermark_index_in_pk: None,
991            clean_watermark_indices: vec![],
992
993            refreshable: false,
994            vector_index_info: None,
995            cdc_table_type: None,
996        }
997        .into();
998
999        assert_eq!(
1000            table,
1001            TableCatalog {
1002                id: TableId::new(0),
1003                schema_id: 0.into(),
1004                database_id: 0.into(),
1005                associated_source_id: Some(SourceId::new(233)),
1006                name: "test".to_owned(),
1007                table_type: TableType::Table,
1008                columns: vec![
1009                    ColumnCatalog::row_id_column(),
1010                    ColumnCatalog {
1011                        column_desc: ColumnDesc {
1012                            data_type: StructType::new(vec![
1013                                ("address", DataType::Varchar),
1014                                ("zipcode", DataType::Varchar)
1015                            ],)
1016                            .into(),
1017                            column_id: ColumnId::new(1),
1018                            name: "country".to_owned(),
1019                            description: None,
1020                            generated_or_default_column: None,
1021                            additional_column: AdditionalColumn { column_type: None },
1022                            version: ColumnDescVersion::LATEST,
1023                            system_column: None,
1024                            nullable: true,
1025                        },
1026                        is_hidden: false
1027                    },
1028                    ColumnCatalog::rw_timestamp_column(),
1029                ],
1030                stream_key: vec![0],
1031                pk: vec![ColumnOrder::new(0, OrderType::ascending())],
1032                distribution_key: vec![0],
1033                append_only: false,
1034                owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
1035                retention_seconds: Some(300),
1036                fragment_id: 0.into(),
1037                dml_fragment_id: None,
1038                vnode_col_index: None,
1039                row_id_index: None,
1040                value_indices: vec![0],
1041                definition: "".into(),
1042                conflict_behavior: ConflictBehavior::NoCheck,
1043                read_prefix_len_hint: 0,
1044                version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
1045                watermark_columns: FixedBitSet::with_capacity(3),
1046                dist_key_in_pk: vec![0],
1047                cardinality: Cardinality::unknown(),
1048                created_at_epoch: None,
1049                initialized_at_epoch: None,
1050                stream_job_status: StreamJobStatus::Created,
1051                create_type: CreateType::Foreground,
1052                description: Some("description".to_owned()),
1053                created_at_cluster_version: None,
1054                initialized_at_cluster_version: None,
1055                version_column_indices: Vec::new(),
1056                cdc_table_id: None,
1057                vnode_count: VnodeCount::set(233),
1058                webhook_info: None,
1059                job_id: None,
1060                engine: Engine::Hummock,
1061                clean_watermark_index_in_pk: None,
1062                clean_watermark_indices: vec![],
1063
1064                refreshable: false,
1065                vector_index_info: None,
1066                cdc_table_type: None,
1067            }
1068        );
1069        assert_eq!(table, TableCatalog::from(table.to_prost()));
1070    }
1071}