risingwave_frontend/catalog/
table_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21    ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, Schema,
22    StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_pb::catalog::table::{
28    OptionalAssociatedSourceId, PbEngine, PbTableType, PbTableVersion,
29};
30use risingwave_pb::catalog::{
31    PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
32};
33use risingwave_pb::common::PbColumnOrder;
34use risingwave_pb::plan_common::DefaultColumnDesc;
35use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
36use risingwave_sqlparser::ast;
37use risingwave_sqlparser::parser::Parser;
38use thiserror_ext::AsReport as _;
39
40use super::purify::try_purify_table_source_create_sql_ast;
41use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId, SinkId};
42use crate::error::{ErrorCode, Result, RwError};
43use crate::expr::ExprImpl;
44use crate::optimizer::property::Cardinality;
45use crate::session::current::notice_to_user;
46use crate::user::UserId;
47
48/// `TableCatalog` Includes full information about a table.
49///
50/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
51/// It is not the same as a user-side table created by `CREATE TABLE`.
52///
53/// Use [`Self::table_type()`] to determine the [`TableType`] of the table.
54///
55/// # Column ID & Column Index
56///
57/// [`ColumnId`](risingwave_common::catalog::ColumnId) (with type `i32`) is the unique identifier of
58/// a column in a table. It is used to access storage.
59///
60/// Column index, or idx, (with type `usize`) is the relative position inside the `Vec` of columns.
61///
62/// A tip to avoid making mistakes is never do casting - i32 as usize or vice versa.
63///
64/// # Keys
65///
66/// All the keys are represented as column indices.
67///
68/// - **Primary Key** (pk): unique identifier of a row.
69///
70/// - **Order Key**: the primary key for storage, used to sort and access data.
71///
72///   For an MV, the columns in `ORDER BY` clause will be put at the beginning of the order key. And
73///   the remaining columns in pk will follow behind.
74///
75///   If there's no `ORDER BY` clause, the order key will be the same as pk.
76///
77/// - **Distribution Key**: the columns used to partition the data. It must be a subset of the order
78///   key.
79#[derive(Clone, Debug, PartialEq, Eq, Hash)]
80#[cfg_attr(test, derive(Default))]
81pub struct TableCatalog {
82    pub id: TableId,
83
84    pub schema_id: SchemaId,
85
86    pub database_id: DatabaseId,
87
88    pub associated_source_id: Option<TableId>, // TODO: use SourceId
89
90    pub name: String,
91
92    /// All columns in this table.
93    pub columns: Vec<ColumnCatalog>,
94
95    /// Key used as materialize's storage key prefix, including MV order columns and `stream_key`.
96    pub pk: Vec<ColumnOrder>,
97
98    /// `pk_indices` of the corresponding materialize operator's output.
99    pub stream_key: Vec<usize>,
100
101    /// Type of the table. Used to distinguish user-created tables, materialized views, index
102    /// tables, and internal tables.
103    pub table_type: TableType,
104
105    /// Distribution key column indices.
106    pub distribution_key: Vec<usize>,
107
108    /// The append-only attribute is derived from `StreamMaterialize` and `StreamTableScan` relies
109    /// on this to derive an append-only stream plan.
110    pub append_only: bool,
111
112    /// The cardinality of the table.
113    pub cardinality: Cardinality,
114
115    /// Owner of the table.
116    pub owner: UserId,
117
118    // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
119    pub retention_seconds: Option<u32>,
120
121    /// The fragment id of the `Materialize` operator for this table.
122    pub fragment_id: FragmentId,
123
124    /// The fragment id of the `DML` operator for this table.
125    pub dml_fragment_id: Option<FragmentId>,
126
127    /// An optional column index which is the vnode of each row computed by the table's consistent
128    /// hash distribution.
129    pub vnode_col_index: Option<usize>,
130
131    /// An optional column index of row id. If the primary key is specified by users, this will be
132    /// `None`.
133    pub row_id_index: Option<usize>,
134
135    /// The column indices which are stored in the state store's value with row-encoding.
136    pub value_indices: Vec<usize>,
137
138    /// The full `CREATE TABLE` or `CREATE MATERIALIZED VIEW` definition of the table.
139    pub definition: String,
140
141    /// The behavior of handling incoming pk conflict from source executor, we can overwrite or
142    /// ignore conflict pk. For normal materialize executor and other executors, this field will be
143    /// `No Check`.
144    pub conflict_behavior: ConflictBehavior,
145
146    pub version_column_index: Option<usize>,
147
148    pub read_prefix_len_hint: usize,
149
150    /// Per-table catalog version, used by schema change. `None` for internal tables and tests.
151    pub version: Option<TableVersion>,
152
153    /// The column indices which could receive watermarks.
154    pub watermark_columns: FixedBitSet,
155
156    /// Optional field specifies the distribution key indices in pk.
157    /// See <https://github.com/risingwavelabs/risingwave/issues/8377> for more information.
158    pub dist_key_in_pk: Vec<usize>,
159
160    pub created_at_epoch: Option<Epoch>,
161
162    pub initialized_at_epoch: Option<Epoch>,
163
164    /// Indicate whether to use watermark cache for state table.
165    pub cleaned_by_watermark: bool,
166
167    /// Indicate whether to create table in background or foreground.
168    pub create_type: CreateType,
169
170    /// Indicate the stream job status, whether it is created or creating.
171    /// If it is creating, we should hide it.
172    pub stream_job_status: StreamJobStatus,
173
174    /// description of table, set by `comment on`.
175    pub description: Option<String>,
176
177    /// Incoming sinks, used for sink into table
178    pub incoming_sinks: Vec<SinkId>,
179
180    pub created_at_cluster_version: Option<String>,
181
182    pub initialized_at_cluster_version: Option<String>,
183
184    pub cdc_table_id: Option<String>,
185
186    /// Total vnode count of the table.
187    ///
188    /// Can be [`VnodeCount::Placeholder`] if the catalog is generated by the frontend and the
189    /// corresponding job is still in `Creating` status, in which case calling [`Self::vnode_count`]
190    /// will panic.
191    ///
192    /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog
193    /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build
194    pub vnode_count: VnodeCount,
195
196    pub webhook_info: Option<PbWebhookSourceInfo>,
197
198    pub job_id: Option<TableId>,
199
200    pub engine: Engine,
201
202    pub clean_watermark_index_in_pk: Option<usize>,
203
204    /// Whether the table supports manual refresh operations
205    pub refreshable: bool,
206
207    pub vector_index_info: Option<PbVectorIndexInfo>,
208}
209
210pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
211pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
212
213#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
214pub enum TableType {
215    /// Tables created by `CREATE TABLE`.
216    Table,
217    /// Tables created by `CREATE MATERIALIZED VIEW`.
218    MaterializedView,
219    /// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`.
220    /// An index has both a `TableCatalog` and an `IndexCatalog`.
221    Index,
222    /// Internal tables for executors.
223    Internal,
224}
225
226#[cfg(test)]
227impl Default for TableType {
228    fn default() -> Self {
229        Self::Table
230    }
231}
232
233impl TableType {
234    fn from_prost(prost: PbTableType) -> Self {
235        match prost {
236            PbTableType::Table => Self::Table,
237            PbTableType::MaterializedView => Self::MaterializedView,
238            PbTableType::Index => Self::Index,
239            PbTableType::Internal => Self::Internal,
240            PbTableType::Unspecified => unreachable!(),
241        }
242    }
243
244    pub(crate) fn to_prost(self) -> PbTableType {
245        match self {
246            Self::Table => PbTableType::Table,
247            Self::MaterializedView => PbTableType::MaterializedView,
248            Self::Index => PbTableType::Index,
249            Self::Internal => PbTableType::Internal,
250        }
251    }
252}
253
254/// The version of a table, used by schema change. See [`PbTableVersion`] for more details.
255#[derive(Clone, Debug, PartialEq, Eq, Hash)]
256pub struct TableVersion {
257    pub version_id: TableVersionId,
258    pub next_column_id: ColumnId,
259}
260
261impl TableVersion {
262    /// Create an initial version for a table, with the given max column id.
263    #[cfg(test)]
264    pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
265        use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
266
267        Self {
268            version_id: INITIAL_TABLE_VERSION_ID,
269            next_column_id: max_column_id.next(),
270        }
271    }
272
273    pub fn from_prost(prost: PbTableVersion) -> Self {
274        Self {
275            version_id: prost.version,
276            next_column_id: ColumnId::from(prost.next_column_id),
277        }
278    }
279
280    pub fn to_prost(&self) -> PbTableVersion {
281        PbTableVersion {
282            version: self.version_id,
283            next_column_id: self.next_column_id.into(),
284        }
285    }
286}
287
288impl TableCatalog {
289    /// Returns the SQL definition when the table was created, purified with best effort
290    /// if it's a table.
291    ///
292    /// See [`Self::create_sql_ast_purified`] for more details.
293    pub fn create_sql_purified(&self) -> String {
294        self.create_sql_ast_purified()
295            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
296            .unwrap_or_else(|_| self.create_sql())
297    }
298
299    /// Returns the parsed SQL definition when the table was created, purified with best effort
300    /// if it's a table.
301    ///
302    /// Returns error if it's invalid.
303    pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
304        // Purification is only applicable to tables.
305        if let TableType::Table = self.table_type() {
306            let base = if self.definition.is_empty() {
307                // Created by `CREATE TABLE AS`, create a skeleton `CREATE TABLE` statement.
308                let name = ast::ObjectName(vec![self.name.as_str().into()]);
309                ast::Statement::default_create_table(name)
310            } else {
311                self.create_sql_ast_from_persisted()?
312            };
313
314            match try_purify_table_source_create_sql_ast(
315                base,
316                self.columns(),
317                self.row_id_index,
318                &self.pk_column_ids(),
319            ) {
320                Ok(stmt) => return Ok(stmt),
321                Err(e) => notice_to_user(format!(
322                    "error occurred while purifying definition for table \"{}\", \
323                     results may be inaccurate: {}",
324                    self.name,
325                    e.as_report()
326                )),
327            }
328        }
329
330        self.create_sql_ast_from_persisted()
331    }
332}
333
334impl TableCatalog {
335    /// Get a reference to the table catalog's table id.
336    pub fn id(&self) -> TableId {
337        self.id
338    }
339
340    pub fn with_id(mut self, id: TableId) -> Self {
341        self.id = id;
342        self
343    }
344
345    pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
346        self.cleaned_by_watermark = cleaned_by_watermark;
347        self
348    }
349
350    pub fn conflict_behavior(&self) -> ConflictBehavior {
351        self.conflict_behavior
352    }
353
354    pub fn table_type(&self) -> TableType {
355        self.table_type
356    }
357
358    pub fn engine(&self) -> Engine {
359        self.engine
360    }
361
362    pub fn iceberg_source_name(&self) -> Option<String> {
363        match self.engine {
364            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
365            Engine::Hummock => None,
366        }
367    }
368
369    pub fn iceberg_sink_name(&self) -> Option<String> {
370        match self.engine {
371            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
372            Engine::Hummock => None,
373        }
374    }
375
376    pub fn is_user_table(&self) -> bool {
377        self.table_type == TableType::Table
378    }
379
380    pub fn is_internal_table(&self) -> bool {
381        self.table_type == TableType::Internal
382    }
383
384    pub fn is_mview(&self) -> bool {
385        self.table_type == TableType::MaterializedView
386    }
387
388    pub fn is_index(&self) -> bool {
389        self.table_type == TableType::Index
390    }
391
392    /// Returns an error if `DROP` statements are used on the wrong type of table.
393    #[must_use]
394    pub fn bad_drop_error(&self) -> RwError {
395        let msg = match self.table_type {
396            TableType::MaterializedView => {
397                "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
398            }
399            TableType::Index => "Use `DROP INDEX` to drop an index.",
400            TableType::Table => "Use `DROP TABLE` to drop a table.",
401            TableType::Internal => "Internal tables cannot be dropped.",
402        };
403
404        ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
405    }
406
407    /// Get the table catalog's associated source id.
408    #[must_use]
409    pub fn associated_source_id(&self) -> Option<TableId> {
410        self.associated_source_id
411    }
412
413    pub fn has_associated_source(&self) -> bool {
414        self.associated_source_id.is_some()
415    }
416
417    /// Get a reference to the table catalog's columns.
418    pub fn columns(&self) -> &[ColumnCatalog] {
419        &self.columns
420    }
421
422    pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
423        self.columns
424            .iter()
425            .filter(|c| !c.is_rw_timestamp_column())
426            .cloned()
427            .collect()
428    }
429
430    /// Get a reference to the table catalog's pk desc.
431    pub fn pk(&self) -> &[ColumnOrder] {
432        self.pk.as_ref()
433    }
434
435    /// Get the column IDs of the primary key.
436    pub fn pk_column_ids(&self) -> Vec<ColumnId> {
437        self.pk
438            .iter()
439            .map(|x| self.columns[x.column_index].column_id())
440            .collect()
441    }
442
443    /// Get the column names of the primary key.
444    pub fn pk_column_names(&self) -> Vec<&str> {
445        self.pk
446            .iter()
447            .map(|x| self.columns[x.column_index].name())
448            .collect()
449    }
450
451    /// Get a [`TableDesc`] of the table.
452    ///
453    /// Note: this must be called on existing tables, otherwise it will fail to get the vnode count
454    /// (which is determined by the meta service) and panic.
455    pub fn table_desc(&self) -> TableDesc {
456        use risingwave_common::catalog::TableOption;
457
458        let table_options = TableOption::new(self.retention_seconds);
459
460        TableDesc {
461            table_id: self.id,
462            pk: self.pk.clone(),
463            stream_key: self.stream_key.clone(),
464            columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
465            distribution_key: self.distribution_key.clone(),
466            append_only: self.append_only,
467            retention_seconds: table_options.retention_seconds,
468            value_indices: self.value_indices.clone(),
469            read_prefix_len_hint: self.read_prefix_len_hint,
470            watermark_columns: self.watermark_columns.clone(),
471            versioned: self.version.is_some(),
472            vnode_col_index: self.vnode_col_index,
473            vnode_count: self.vnode_count(),
474        }
475    }
476
477    /// Get a reference to the table catalog's name.
478    pub fn name(&self) -> &str {
479        self.name.as_ref()
480    }
481
482    pub fn distribution_key(&self) -> &[usize] {
483        self.distribution_key.as_ref()
484    }
485
486    pub fn to_internal_table_prost(&self) -> PbTable {
487        self.to_prost()
488    }
489
490    /// Returns the SQL definition when the table was created.
491    ///
492    /// See [`Self::create_sql_ast`] for more details.
493    pub fn create_sql(&self) -> String {
494        self.create_sql_ast()
495            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
496            .unwrap_or_else(|_| self.definition.clone())
497    }
498
499    /// Returns the parsed SQL definition when the table was created.
500    ///
501    /// Re-create the table with this statement may have different schema if the schema is derived
502    /// from external systems (like schema registry) or it's created by `CREATE TABLE AS`. If this
503    /// is not desired, use [`Self::create_sql_ast_purified`] instead.
504    ///
505    /// Returns error if it's invalid.
506    pub fn create_sql_ast(&self) -> Result<ast::Statement> {
507        if let TableType::Table = self.table_type()
508            && self.definition.is_empty()
509        {
510            // Always fix definition for `CREATE TABLE AS`.
511            self.create_sql_ast_purified()
512        } else {
513            // Directly parse the persisted definition.
514            self.create_sql_ast_from_persisted()
515        }
516    }
517
518    fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
519        Ok(Parser::parse_exactly_one(&self.definition)?)
520    }
521
522    /// Get a reference to the table catalog's version.
523    pub fn version(&self) -> Option<&TableVersion> {
524        self.version.as_ref()
525    }
526
527    /// Get the table's version id. Returns `None` if the table has no version field.
528    pub fn version_id(&self) -> Option<TableVersionId> {
529        self.version().map(|v| v.version_id)
530    }
531
532    /// Get the total vnode count of the table.
533    ///
534    /// Panics if it's called on an incomplete (and not yet persisted) table catalog.
535    pub fn vnode_count(&self) -> usize {
536        self.vnode_count.value()
537    }
538
539    pub fn to_prost(&self) -> PbTable {
540        PbTable {
541            id: self.id.table_id,
542            schema_id: self.schema_id,
543            database_id: self.database_id,
544            name: self.name.clone(),
545            // ignore `_rw_timestamp` when serializing
546            columns: self
547                .columns_without_rw_timestamp()
548                .iter()
549                .map(|c| c.to_protobuf())
550                .collect(),
551            pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
552            stream_key: self.stream_key.iter().map(|x| *x as _).collect(),
553            optional_associated_source_id: self
554                .associated_source_id
555                .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
556            table_type: self.table_type.to_prost() as i32,
557            distribution_key: self
558                .distribution_key
559                .iter()
560                .map(|k| *k as i32)
561                .collect_vec(),
562            append_only: self.append_only,
563            owner: self.owner,
564            fragment_id: self.fragment_id,
565            dml_fragment_id: self.dml_fragment_id,
566            vnode_col_index: self.vnode_col_index.map(|i| i as _),
567            row_id_index: self.row_id_index.map(|i| i as _),
568            value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
569            definition: self.definition.clone(),
570            read_prefix_len_hint: self.read_prefix_len_hint as u32,
571            version: self.version.as_ref().map(TableVersion::to_prost),
572            watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
573            dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
574            handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
575            version_column_index: self.version_column_index.map(|value| value as u32),
576            cardinality: Some(self.cardinality.to_protobuf()),
577            initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
578            created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
579            cleaned_by_watermark: self.cleaned_by_watermark,
580            stream_job_status: self.stream_job_status.to_proto().into(),
581            create_type: self.create_type.to_proto().into(),
582            description: self.description.clone(),
583            incoming_sinks: self.incoming_sinks.clone(),
584            created_at_cluster_version: self.created_at_cluster_version.clone(),
585            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
586            retention_seconds: self.retention_seconds,
587            cdc_table_id: self.cdc_table_id.clone(),
588            maybe_vnode_count: self.vnode_count.to_protobuf(),
589            webhook_info: self.webhook_info.clone(),
590            job_id: self.job_id.map(|id| id.table_id),
591            engine: Some(self.engine.to_protobuf().into()),
592            clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
593            refreshable: self.refreshable,
594            vector_index_info: self.vector_index_info,
595        }
596    }
597
598    /// Get columns excluding hidden columns and generated golumns.
599    pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
600        self.columns
601            .iter()
602            .filter(|c| !c.is_hidden() && !c.is_generated())
603    }
604
605    pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
606        self.columns
607            .iter()
608            .filter(|c| c.is_generated())
609            .map(|c| c.name())
610    }
611
612    pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
613        self.columns
614            .iter()
615            .enumerate()
616            .filter(|(_, c)| c.is_generated())
617            .map(|(i, _)| i)
618    }
619
620    pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
621        if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
622            .columns[col_idx]
623            .column_desc
624            .generated_or_default_column
625            .as_ref()
626        {
627            ExprImpl::from_expr_proto(expr.as_ref().unwrap())
628                .expect("expr in default columns corrupted")
629        } else {
630            ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
631        }
632    }
633
634    pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
635        columns
636            .iter()
637            .map(|c| {
638                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
639                    expr,
640                    ..
641                })) = c.column_desc.generated_or_default_column.as_ref()
642                {
643                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
644                        .expect("expr in default columns corrupted")
645                } else {
646                    ExprImpl::literal_null(c.data_type().clone())
647                }
648            })
649            .collect()
650    }
651
652    pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
653        self.columns.iter().enumerate().filter_map(|(i, c)| {
654            if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
655                expr, ..
656            })) = c.column_desc.generated_or_default_column.as_ref()
657            {
658                Some((
659                    i,
660                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
661                        .expect("expr in default columns corrupted"),
662                ))
663            } else {
664                None
665            }
666        })
667    }
668
669    pub fn has_generated_column(&self) -> bool {
670        self.columns.iter().any(|c| c.is_generated())
671    }
672
673    pub fn has_rw_timestamp_column(&self) -> bool {
674        self.columns.iter().any(|c| c.is_rw_timestamp_column())
675    }
676
677    pub fn column_schema(&self) -> Schema {
678        Schema::new(
679            self.columns
680                .iter()
681                .map(|c| Field::from(&c.column_desc))
682                .collect(),
683        )
684    }
685
686    pub fn is_created(&self) -> bool {
687        self.stream_job_status == StreamJobStatus::Created
688    }
689
690    pub fn is_iceberg_engine_table(&self) -> bool {
691        self.engine == Engine::Iceberg
692    }
693
694    pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
695        self.pk.iter().map(|col| col.column_index)
696    }
697
698    pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
699        ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
700    }
701
702    pub fn order_column_ids(&self) -> Vec<ColumnId> {
703        self.pk
704            .iter()
705            .map(|col| self.columns[col.column_index].column_desc.column_id)
706            .collect()
707    }
708
709    pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
710        // Set materialize key as arrange key + pk
711        self.pk.iter().map(|x| x.to_protobuf()).collect()
712    }
713}
714
715impl From<PbTable> for TableCatalog {
716    fn from(tb: PbTable) -> Self {
717        let id = tb.id;
718        let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
719        let tb_engine = tb
720            .get_engine()
721            .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
722            .unwrap_or(PbEngine::Hummock);
723        let table_type = tb.get_table_type().unwrap();
724        let stream_job_status = tb
725            .get_stream_job_status()
726            .unwrap_or(PbStreamJobStatus::Created);
727        let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
728        let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
729            OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
730        });
731        let name = tb.name.clone();
732
733        let vnode_count = tb.vnode_count_inner();
734        if let VnodeCount::Placeholder = vnode_count {
735            // Only allow placeholder vnode count for creating tables.
736            // After the table is created, an `Update` notification will be used to update the vnode count field.
737            assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
738        }
739
740        let mut col_names = HashSet::new();
741        let mut col_index: HashMap<i32, usize> = HashMap::new();
742
743        let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
744        let version_column_index = tb.version_column_index.map(|value| value as usize);
745        let mut columns: Vec<ColumnCatalog> =
746            tb.columns.into_iter().map(ColumnCatalog::from).collect();
747        if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
748            // Add system column `_rw_timestamp` to every table, but notice that this column is never persisted.
749            columns.push(ColumnCatalog::rw_timestamp_column());
750        }
751        for (idx, catalog) in columns.clone().into_iter().enumerate() {
752            let col_name = catalog.name();
753            if !col_names.insert(col_name.to_owned()) {
754                panic!("duplicated column name {} in table {} ", col_name, tb.name)
755            }
756
757            let col_id = catalog.column_desc.column_id.get_id();
758            col_index.insert(col_id, idx);
759        }
760
761        let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
762        let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
763        for idx in &tb.watermark_indices {
764            watermark_columns.insert(*idx as _);
765        }
766        let engine = Engine::from_protobuf(&tb_engine);
767
768        Self {
769            id: id.into(),
770            schema_id: tb.schema_id,
771            database_id: tb.database_id,
772            associated_source_id: associated_source_id.map(Into::into),
773            name,
774            pk,
775            columns,
776            table_type: TableType::from_prost(table_type),
777            distribution_key: tb
778                .distribution_key
779                .iter()
780                .map(|k| *k as usize)
781                .collect_vec(),
782            stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
783            append_only: tb.append_only,
784            owner: tb.owner,
785            fragment_id: tb.fragment_id,
786            dml_fragment_id: tb.dml_fragment_id,
787            vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
788            row_id_index: tb.row_id_index.map(|x| x as usize),
789            value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
790            definition: tb.definition,
791            conflict_behavior,
792            version_column_index,
793            read_prefix_len_hint: tb.read_prefix_len_hint as usize,
794            version: tb.version.map(TableVersion::from_prost),
795            watermark_columns,
796            dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
797            cardinality: tb
798                .cardinality
799                .map(|c| Cardinality::from_protobuf(&c))
800                .unwrap_or_else(Cardinality::unknown),
801            created_at_epoch: tb.created_at_epoch.map(Epoch::from),
802            initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
803            cleaned_by_watermark: tb.cleaned_by_watermark,
804            create_type: CreateType::from_proto(create_type),
805            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
806            description: tb.description,
807            incoming_sinks: tb.incoming_sinks.clone(),
808            created_at_cluster_version: tb.created_at_cluster_version.clone(),
809            initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
810            retention_seconds: tb.retention_seconds,
811            cdc_table_id: tb.cdc_table_id,
812            vnode_count,
813            webhook_info: tb.webhook_info,
814            job_id: tb.job_id.map(TableId::from),
815            engine,
816            clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
817            refreshable: tb.refreshable,
818            vector_index_info: tb.vector_index_info,
819        }
820    }
821}
822
823impl From<&PbTable> for TableCatalog {
824    fn from(tb: &PbTable) -> Self {
825        tb.clone().into()
826    }
827}
828
829impl OwnedByUserCatalog for TableCatalog {
830    fn owner(&self) -> UserId {
831        self.owner
832    }
833}
834
835#[cfg(test)]
836mod tests {
837    use risingwave_common::catalog::{ColumnDesc, ColumnId};
838    use risingwave_common::test_prelude::*;
839    use risingwave_common::types::*;
840    use risingwave_common::util::sort_util::OrderType;
841    use risingwave_pb::catalog::table::PbEngine;
842    use risingwave_pb::plan_common::{
843        AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
844    };
845
846    use super::*;
847
848    #[test]
849    fn test_into_table_catalog() {
850        let table: TableCatalog = PbTable {
851            id: 0,
852            schema_id: 0,
853            database_id: 0,
854            name: "test".to_owned(),
855            table_type: PbTableType::Table as i32,
856            columns: vec![
857                ColumnCatalog::row_id_column().to_protobuf(),
858                PbColumnCatalog {
859                    column_desc: Some(PbColumnDesc::new(
860                        DataType::from(StructType::new([
861                            ("address", DataType::Varchar),
862                            ("zipcode", DataType::Varchar),
863                        ]))
864                        .to_protobuf(),
865                        "country",
866                        1,
867                    )),
868                    is_hidden: false,
869                },
870            ],
871            pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
872            stream_key: vec![0],
873            distribution_key: vec![0],
874            optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233)
875                .into(),
876            append_only: false,
877            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
878            retention_seconds: Some(300),
879            fragment_id: 0,
880            dml_fragment_id: None,
881            initialized_at_epoch: None,
882            value_indices: vec![0],
883            definition: "".into(),
884            read_prefix_len_hint: 0,
885            vnode_col_index: None,
886            row_id_index: None,
887            version: Some(PbTableVersion {
888                version: 0,
889                next_column_id: 2,
890            }),
891            watermark_indices: vec![],
892            handle_pk_conflict_behavior: 3,
893            dist_key_in_pk: vec![0],
894            cardinality: None,
895            created_at_epoch: None,
896            cleaned_by_watermark: false,
897            stream_job_status: PbStreamJobStatus::Created.into(),
898            create_type: PbCreateType::Foreground.into(),
899            description: Some("description".to_owned()),
900            incoming_sinks: vec![],
901            created_at_cluster_version: None,
902            initialized_at_cluster_version: None,
903            version_column_index: None,
904            cdc_table_id: None,
905            maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
906            webhook_info: None,
907            job_id: None,
908            engine: Some(PbEngine::Hummock as i32),
909            clean_watermark_index_in_pk: None,
910            refreshable: false,
911            vector_index_info: None,
912        }
913        .into();
914
915        assert_eq!(
916            table,
917            TableCatalog {
918                id: TableId::new(0),
919                schema_id: 0,
920                database_id: 0,
921                associated_source_id: Some(TableId::new(233)),
922                name: "test".to_owned(),
923                table_type: TableType::Table,
924                columns: vec![
925                    ColumnCatalog::row_id_column(),
926                    ColumnCatalog {
927                        column_desc: ColumnDesc {
928                            data_type: StructType::new(vec![
929                                ("address", DataType::Varchar),
930                                ("zipcode", DataType::Varchar)
931                            ],)
932                            .into(),
933                            column_id: ColumnId::new(1),
934                            name: "country".to_owned(),
935                            description: None,
936                            generated_or_default_column: None,
937                            additional_column: AdditionalColumn { column_type: None },
938                            version: ColumnDescVersion::LATEST,
939                            system_column: None,
940                            nullable: true,
941                        },
942                        is_hidden: false
943                    },
944                    ColumnCatalog::rw_timestamp_column(),
945                ],
946                stream_key: vec![0],
947                pk: vec![ColumnOrder::new(0, OrderType::ascending())],
948                distribution_key: vec![0],
949                append_only: false,
950                owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
951                retention_seconds: Some(300),
952                fragment_id: 0,
953                dml_fragment_id: None,
954                vnode_col_index: None,
955                row_id_index: None,
956                value_indices: vec![0],
957                definition: "".into(),
958                conflict_behavior: ConflictBehavior::NoCheck,
959                read_prefix_len_hint: 0,
960                version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
961                watermark_columns: FixedBitSet::with_capacity(3),
962                dist_key_in_pk: vec![0],
963                cardinality: Cardinality::unknown(),
964                created_at_epoch: None,
965                initialized_at_epoch: None,
966                cleaned_by_watermark: false,
967                stream_job_status: StreamJobStatus::Created,
968                create_type: CreateType::Foreground,
969                description: Some("description".to_owned()),
970                incoming_sinks: vec![],
971                created_at_cluster_version: None,
972                initialized_at_cluster_version: None,
973                version_column_index: None,
974                cdc_table_id: None,
975                vnode_count: VnodeCount::set(233),
976                webhook_info: None,
977                job_id: None,
978                engine: Engine::Hummock,
979                clean_watermark_index_in_pk: None,
980                refreshable: false,
981                vector_index_info: None,
982            }
983        );
984        assert_eq!(table, TableCatalog::from(table.to_prost()));
985    }
986}