risingwave_frontend/catalog/
table_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21    ColumnCatalog, ConflictBehavior, CreateType, Engine, Field, Schema, StreamJobStatus, TableDesc,
22    TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_pb::catalog::table::{
28    OptionalAssociatedSourceId, PbEngine, PbTableType, PbTableVersion,
29};
30use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable, PbWebhookSourceInfo};
31use risingwave_pb::plan_common::DefaultColumnDesc;
32use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
33use risingwave_sqlparser::ast;
34use risingwave_sqlparser::parser::Parser;
35use thiserror_ext::AsReport as _;
36
37use super::purify::try_purify_table_source_create_sql_ast;
38use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId, SinkId};
39use crate::error::{ErrorCode, Result, RwError};
40use crate::expr::ExprImpl;
41use crate::optimizer::property::Cardinality;
42use crate::session::current::notice_to_user;
43use crate::user::UserId;
44
45/// `TableCatalog` Includes full information about a table.
46///
47/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
48/// It is not the same as a user-side table created by `CREATE TABLE`.
49///
50/// Use [`Self::table_type()`] to determine the [`TableType`] of the table.
51///
52/// # Column ID & Column Index
53///
54/// [`ColumnId`](risingwave_common::catalog::ColumnId) (with type `i32`) is the unique identifier of
55/// a column in a table. It is used to access storage.
56///
57/// Column index, or idx, (with type `usize`) is the relative position inside the `Vec` of columns.
58///
59/// A tip to avoid making mistakes is never do casting - i32 as usize or vice versa.
60///
61/// # Keys
62///
63/// All the keys are represented as column indices.
64///
65/// - **Primary Key** (pk): unique identifier of a row.
66///
67/// - **Order Key**: the primary key for storage, used to sort and access data.
68///
69///   For an MV, the columns in `ORDER BY` clause will be put at the beginning of the order key. And
70///   the remaining columns in pk will follow behind.
71///
72///   If there's no `ORDER BY` clause, the order key will be the same as pk.
73///
74/// - **Distribution Key**: the columns used to partition the data. It must be a subset of the order
75///   key.
76#[derive(Clone, Debug, PartialEq, Eq, Hash)]
77#[cfg_attr(test, derive(Default))]
78pub struct TableCatalog {
79    pub id: TableId,
80
81    pub schema_id: SchemaId,
82
83    pub database_id: DatabaseId,
84
85    pub associated_source_id: Option<TableId>, // TODO: use SourceId
86
87    pub name: String,
88
89    pub dependent_relations: Vec<TableId>,
90
91    /// All columns in this table.
92    pub columns: Vec<ColumnCatalog>,
93
94    /// Key used as materialize's storage key prefix, including MV order columns and `stream_key`.
95    pub pk: Vec<ColumnOrder>,
96
97    /// `pk_indices` of the corresponding materialize operator's output.
98    pub stream_key: Vec<usize>,
99
100    /// Type of the table. Used to distinguish user-created tables, materialized views, index
101    /// tables, and internal tables.
102    pub table_type: TableType,
103
104    /// Distribution key column indices.
105    pub distribution_key: Vec<usize>,
106
107    /// The append-only attribute is derived from `StreamMaterialize` and `StreamTableScan` relies
108    /// on this to derive an append-only stream plan.
109    pub append_only: bool,
110
111    /// The cardinality of the table.
112    pub cardinality: Cardinality,
113
114    /// Owner of the table.
115    pub owner: UserId,
116
117    // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
118    pub retention_seconds: Option<u32>,
119
120    /// The fragment id of the `Materialize` operator for this table.
121    pub fragment_id: FragmentId,
122
123    /// The fragment id of the `DML` operator for this table.
124    pub dml_fragment_id: Option<FragmentId>,
125
126    /// An optional column index which is the vnode of each row computed by the table's consistent
127    /// hash distribution.
128    pub vnode_col_index: Option<usize>,
129
130    /// An optional column index of row id. If the primary key is specified by users, this will be
131    /// `None`.
132    pub row_id_index: Option<usize>,
133
134    /// The column indices which are stored in the state store's value with row-encoding.
135    pub value_indices: Vec<usize>,
136
137    /// The full `CREATE TABLE` or `CREATE MATERIALIZED VIEW` definition of the table.
138    pub definition: String,
139
140    /// The behavior of handling incoming pk conflict from source executor, we can overwrite or
141    /// ignore conflict pk. For normal materialize executor and other executors, this field will be
142    /// `No Check`.
143    pub conflict_behavior: ConflictBehavior,
144
145    pub version_column_index: Option<usize>,
146
147    pub read_prefix_len_hint: usize,
148
149    /// Per-table catalog version, used by schema change. `None` for internal tables and tests.
150    pub version: Option<TableVersion>,
151
152    /// The column indices which could receive watermarks.
153    pub watermark_columns: FixedBitSet,
154
155    /// Optional field specifies the distribution key indices in pk.
156    /// See <https://github.com/risingwavelabs/risingwave/issues/8377> for more information.
157    pub dist_key_in_pk: Vec<usize>,
158
159    pub created_at_epoch: Option<Epoch>,
160
161    pub initialized_at_epoch: Option<Epoch>,
162
163    /// Indicate whether to use watermark cache for state table.
164    pub cleaned_by_watermark: bool,
165
166    /// Indicate whether to create table in background or foreground.
167    pub create_type: CreateType,
168
169    /// Indicate the stream job status, whether it is created or creating.
170    /// If it is creating, we should hide it.
171    pub stream_job_status: StreamJobStatus,
172
173    /// description of table, set by `comment on`.
174    pub description: Option<String>,
175
176    /// Incoming sinks, used for sink into table
177    pub incoming_sinks: Vec<SinkId>,
178
179    pub created_at_cluster_version: Option<String>,
180
181    pub initialized_at_cluster_version: Option<String>,
182
183    pub cdc_table_id: Option<String>,
184
185    /// Total vnode count of the table.
186    ///
187    /// Can be [`VnodeCount::Placeholder`] if the catalog is generated by the frontend and the
188    /// corresponding job is still in `Creating` status, in which case calling [`Self::vnode_count`]
189    /// will panic.
190    ///
191    /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog
192    /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build
193    pub vnode_count: VnodeCount,
194
195    pub webhook_info: Option<PbWebhookSourceInfo>,
196
197    pub job_id: Option<TableId>,
198
199    pub engine: Engine,
200
201    pub clean_watermark_index_in_pk: Option<usize>,
202}
203
204pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
205pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
206
207#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
208pub enum TableType {
209    /// Tables created by `CREATE TABLE`.
210    Table,
211    /// Tables created by `CREATE MATERIALIZED VIEW`.
212    MaterializedView,
213    /// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`.
214    /// An index has both a `TableCatalog` and an `IndexCatalog`.
215    Index,
216    /// Internal tables for executors.
217    Internal,
218}
219
220#[cfg(test)]
221impl Default for TableType {
222    fn default() -> Self {
223        Self::Table
224    }
225}
226
227impl TableType {
228    fn from_prost(prost: PbTableType) -> Self {
229        match prost {
230            PbTableType::Table => Self::Table,
231            PbTableType::MaterializedView => Self::MaterializedView,
232            PbTableType::Index => Self::Index,
233            PbTableType::Internal => Self::Internal,
234            PbTableType::Unspecified => unreachable!(),
235        }
236    }
237
238    pub(crate) fn to_prost(self) -> PbTableType {
239        match self {
240            Self::Table => PbTableType::Table,
241            Self::MaterializedView => PbTableType::MaterializedView,
242            Self::Index => PbTableType::Index,
243            Self::Internal => PbTableType::Internal,
244        }
245    }
246}
247
248/// The version of a table, used by schema change. See [`PbTableVersion`] for more details.
249#[derive(Clone, Debug, PartialEq, Eq, Hash)]
250pub struct TableVersion {
251    pub version_id: TableVersionId,
252    pub next_column_id: ColumnId,
253}
254
255impl TableVersion {
256    /// Create an initial version for a table, with the given max column id.
257    #[cfg(test)]
258    pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
259        use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
260
261        Self {
262            version_id: INITIAL_TABLE_VERSION_ID,
263            next_column_id: max_column_id.next(),
264        }
265    }
266
267    pub fn from_prost(prost: PbTableVersion) -> Self {
268        Self {
269            version_id: prost.version,
270            next_column_id: ColumnId::from(prost.next_column_id),
271        }
272    }
273
274    pub fn to_prost(&self) -> PbTableVersion {
275        PbTableVersion {
276            version: self.version_id,
277            next_column_id: self.next_column_id.into(),
278        }
279    }
280}
281
282impl TableCatalog {
283    /// Returns the SQL definition when the table was created, purified with best effort
284    /// if it's a table.
285    ///
286    /// See [`Self::create_sql_ast_purified`] for more details.
287    pub fn create_sql_purified(&self) -> String {
288        self.create_sql_ast_purified()
289            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
290            .unwrap_or_else(|_| self.create_sql())
291    }
292
293    /// Returns the parsed SQL definition when the table was created, purified with best effort
294    /// if it's a table.
295    ///
296    /// Returns error if it's invalid.
297    pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
298        // Purification is only applicable to tables.
299        if let TableType::Table = self.table_type() {
300            let base = if self.definition.is_empty() {
301                // Created by `CREATE TABLE AS`, create a skeleton `CREATE TABLE` statement.
302                let name = ast::ObjectName(vec![self.name.as_str().into()]);
303                ast::Statement::default_create_table(name)
304            } else {
305                self.create_sql_ast_from_persisted()?
306            };
307
308            match try_purify_table_source_create_sql_ast(
309                base,
310                self.columns(),
311                self.row_id_index,
312                &self.pk_column_ids(),
313            ) {
314                Ok(stmt) => return Ok(stmt),
315                Err(e) => notice_to_user(format!(
316                    "error occurred while purifying definition for table \"{}\", \
317                     results may be inaccurate: {}",
318                    self.name,
319                    e.as_report()
320                )),
321            }
322        }
323
324        self.create_sql_ast_from_persisted()
325    }
326}
327
328impl TableCatalog {
329    /// Get a reference to the table catalog's table id.
330    pub fn id(&self) -> TableId {
331        self.id
332    }
333
334    pub fn with_id(mut self, id: TableId) -> Self {
335        self.id = id;
336        self
337    }
338
339    pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
340        self.cleaned_by_watermark = cleaned_by_watermark;
341        self
342    }
343
344    pub fn conflict_behavior(&self) -> ConflictBehavior {
345        self.conflict_behavior
346    }
347
348    pub fn table_type(&self) -> TableType {
349        self.table_type
350    }
351
352    pub fn engine(&self) -> Engine {
353        self.engine
354    }
355
356    pub fn iceberg_source_name(&self) -> Option<String> {
357        match self.engine {
358            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
359            Engine::Hummock => None,
360        }
361    }
362
363    pub fn iceberg_sink_name(&self) -> Option<String> {
364        match self.engine {
365            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
366            Engine::Hummock => None,
367        }
368    }
369
370    pub fn is_user_table(&self) -> bool {
371        self.table_type == TableType::Table
372    }
373
374    pub fn is_internal_table(&self) -> bool {
375        self.table_type == TableType::Internal
376    }
377
378    pub fn is_mview(&self) -> bool {
379        self.table_type == TableType::MaterializedView
380    }
381
382    pub fn is_index(&self) -> bool {
383        self.table_type == TableType::Index
384    }
385
386    /// Returns an error if `DROP` statements are used on the wrong type of table.
387    #[must_use]
388    pub fn bad_drop_error(&self) -> RwError {
389        let msg = match self.table_type {
390            TableType::MaterializedView => {
391                "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
392            }
393            TableType::Index => "Use `DROP INDEX` to drop an index.",
394            TableType::Table => "Use `DROP TABLE` to drop a table.",
395            TableType::Internal => "Internal tables cannot be dropped.",
396        };
397
398        ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
399    }
400
401    /// Get the table catalog's associated source id.
402    #[must_use]
403    pub fn associated_source_id(&self) -> Option<TableId> {
404        self.associated_source_id
405    }
406
407    pub fn has_associated_source(&self) -> bool {
408        self.associated_source_id.is_some()
409    }
410
411    /// Get a reference to the table catalog's columns.
412    pub fn columns(&self) -> &[ColumnCatalog] {
413        &self.columns
414    }
415
416    pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
417        self.columns
418            .iter()
419            .filter(|c| !c.is_rw_timestamp_column())
420            .cloned()
421            .collect()
422    }
423
424    /// Get a reference to the table catalog's pk desc.
425    pub fn pk(&self) -> &[ColumnOrder] {
426        self.pk.as_ref()
427    }
428
429    /// Get the column IDs of the primary key.
430    pub fn pk_column_ids(&self) -> Vec<ColumnId> {
431        self.pk
432            .iter()
433            .map(|x| self.columns[x.column_index].column_id())
434            .collect()
435    }
436
437    /// Get the column names of the primary key.
438    pub fn pk_column_names(&self) -> Vec<&str> {
439        self.pk
440            .iter()
441            .map(|x| self.columns[x.column_index].name())
442            .collect()
443    }
444
445    /// Get a [`TableDesc`] of the table.
446    ///
447    /// Note: this must be called on existing tables, otherwise it will fail to get the vnode count
448    /// (which is determined by the meta service) and panic.
449    pub fn table_desc(&self) -> TableDesc {
450        use risingwave_common::catalog::TableOption;
451
452        let table_options = TableOption::new(self.retention_seconds);
453
454        TableDesc {
455            table_id: self.id,
456            pk: self.pk.clone(),
457            stream_key: self.stream_key.clone(),
458            columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
459            distribution_key: self.distribution_key.clone(),
460            append_only: self.append_only,
461            retention_seconds: table_options.retention_seconds,
462            value_indices: self.value_indices.clone(),
463            read_prefix_len_hint: self.read_prefix_len_hint,
464            watermark_columns: self.watermark_columns.clone(),
465            versioned: self.version.is_some(),
466            vnode_col_index: self.vnode_col_index,
467            vnode_count: self.vnode_count(),
468        }
469    }
470
471    /// Get a reference to the table catalog's name.
472    pub fn name(&self) -> &str {
473        self.name.as_ref()
474    }
475
476    pub fn distribution_key(&self) -> &[usize] {
477        self.distribution_key.as_ref()
478    }
479
480    pub fn to_internal_table_prost(&self) -> PbTable {
481        self.to_prost()
482    }
483
484    /// Returns the SQL definition when the table was created.
485    ///
486    /// See [`Self::create_sql_ast`] for more details.
487    pub fn create_sql(&self) -> String {
488        self.create_sql_ast()
489            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
490            .unwrap_or_else(|_| self.definition.clone())
491    }
492
493    /// Returns the parsed SQL definition when the table was created.
494    ///
495    /// Re-create the table with this statement may have different schema if the schema is derived
496    /// from external systems (like schema registry) or it's created by `CREATE TABLE AS`. If this
497    /// is not desired, use [`Self::create_sql_ast_purified`] instead.
498    ///
499    /// Returns error if it's invalid.
500    pub fn create_sql_ast(&self) -> Result<ast::Statement> {
501        if let TableType::Table = self.table_type()
502            && self.definition.is_empty()
503        {
504            // Always fix definition for `CREATE TABLE AS`.
505            self.create_sql_ast_purified()
506        } else {
507            // Directly parse the persisted definition.
508            self.create_sql_ast_from_persisted()
509        }
510    }
511
512    fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
513        Ok(Parser::parse_exactly_one(&self.definition)?)
514    }
515
516    /// Get a reference to the table catalog's version.
517    pub fn version(&self) -> Option<&TableVersion> {
518        self.version.as_ref()
519    }
520
521    /// Get the table's version id. Returns `None` if the table has no version field.
522    pub fn version_id(&self) -> Option<TableVersionId> {
523        self.version().map(|v| v.version_id)
524    }
525
526    /// Get the total vnode count of the table.
527    ///
528    /// Panics if it's called on an incomplete (and not yet persisted) table catalog.
529    pub fn vnode_count(&self) -> usize {
530        self.vnode_count.value()
531    }
532
533    pub fn to_prost(&self) -> PbTable {
534        PbTable {
535            id: self.id.table_id,
536            schema_id: self.schema_id,
537            database_id: self.database_id,
538            name: self.name.clone(),
539            // ignore `_rw_timestamp` when serializing
540            columns: self
541                .columns_without_rw_timestamp()
542                .iter()
543                .map(|c| c.to_protobuf())
544                .collect(),
545            pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
546            stream_key: self.stream_key.iter().map(|x| *x as _).collect(),
547            dependent_relations: (self.dependent_relations.iter())
548                .map(|t| t.table_id)
549                .collect(),
550            optional_associated_source_id: self
551                .associated_source_id
552                .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
553            table_type: self.table_type.to_prost() as i32,
554            distribution_key: self
555                .distribution_key
556                .iter()
557                .map(|k| *k as i32)
558                .collect_vec(),
559            append_only: self.append_only,
560            owner: self.owner,
561            fragment_id: self.fragment_id,
562            dml_fragment_id: self.dml_fragment_id,
563            vnode_col_index: self.vnode_col_index.map(|i| i as _),
564            row_id_index: self.row_id_index.map(|i| i as _),
565            value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
566            definition: self.definition.clone(),
567            read_prefix_len_hint: self.read_prefix_len_hint as u32,
568            version: self.version.as_ref().map(TableVersion::to_prost),
569            watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
570            dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
571            handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
572            version_column_index: self.version_column_index.map(|value| value as u32),
573            cardinality: Some(self.cardinality.to_protobuf()),
574            initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
575            created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
576            cleaned_by_watermark: self.cleaned_by_watermark,
577            stream_job_status: self.stream_job_status.to_proto().into(),
578            create_type: self.create_type.to_proto().into(),
579            description: self.description.clone(),
580            incoming_sinks: self.incoming_sinks.clone(),
581            created_at_cluster_version: self.created_at_cluster_version.clone(),
582            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
583            retention_seconds: self.retention_seconds,
584            cdc_table_id: self.cdc_table_id.clone(),
585            maybe_vnode_count: self.vnode_count.to_protobuf(),
586            webhook_info: self.webhook_info.clone(),
587            job_id: self.job_id.map(|id| id.table_id),
588            engine: Some(self.engine.to_protobuf().into()),
589            clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
590        }
591    }
592
593    /// Get columns excluding hidden columns and generated golumns.
594    pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
595        self.columns
596            .iter()
597            .filter(|c| !c.is_hidden() && !c.is_generated())
598    }
599
600    pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
601        self.columns
602            .iter()
603            .filter(|c| c.is_generated())
604            .map(|c| c.name())
605    }
606
607    pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
608        self.columns
609            .iter()
610            .enumerate()
611            .filter(|(_, c)| c.is_generated())
612            .map(|(i, _)| i)
613    }
614
615    pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
616        if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
617            .columns[col_idx]
618            .column_desc
619            .generated_or_default_column
620            .as_ref()
621        {
622            ExprImpl::from_expr_proto(expr.as_ref().unwrap())
623                .expect("expr in default columns corrupted")
624        } else {
625            ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
626        }
627    }
628
629    pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
630        columns
631            .iter()
632            .map(|c| {
633                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
634                    expr,
635                    ..
636                })) = c.column_desc.generated_or_default_column.as_ref()
637                {
638                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
639                        .expect("expr in default columns corrupted")
640                } else {
641                    ExprImpl::literal_null(c.data_type().clone())
642                }
643            })
644            .collect()
645    }
646
647    pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
648        self.columns.iter().enumerate().filter_map(|(i, c)| {
649            if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
650                expr, ..
651            })) = c.column_desc.generated_or_default_column.as_ref()
652            {
653                Some((
654                    i,
655                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
656                        .expect("expr in default columns corrupted"),
657                ))
658            } else {
659                None
660            }
661        })
662    }
663
664    pub fn has_generated_column(&self) -> bool {
665        self.columns.iter().any(|c| c.is_generated())
666    }
667
668    pub fn has_rw_timestamp_column(&self) -> bool {
669        self.columns.iter().any(|c| c.is_rw_timestamp_column())
670    }
671
672    pub fn column_schema(&self) -> Schema {
673        Schema::new(
674            self.columns
675                .iter()
676                .map(|c| Field::from(&c.column_desc))
677                .collect(),
678        )
679    }
680
681    pub fn is_created(&self) -> bool {
682        self.stream_job_status == StreamJobStatus::Created
683    }
684
685    pub fn is_iceberg_engine_table(&self) -> bool {
686        self.engine == Engine::Iceberg
687    }
688}
689
690impl From<PbTable> for TableCatalog {
691    fn from(tb: PbTable) -> Self {
692        let id = tb.id;
693        let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
694        let tb_engine = tb
695            .get_engine()
696            .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
697            .unwrap_or(PbEngine::Hummock);
698        let table_type = tb.get_table_type().unwrap();
699        let stream_job_status = tb
700            .get_stream_job_status()
701            .unwrap_or(PbStreamJobStatus::Created);
702        let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
703        let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
704            OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
705        });
706        let name = tb.name.clone();
707
708        let vnode_count = tb.vnode_count_inner();
709        if let VnodeCount::Placeholder = vnode_count {
710            // Only allow placeholder vnode count for creating tables.
711            // After the table is created, an `Update` notification will be used to update the vnode count field.
712            assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
713        }
714
715        let mut col_names = HashSet::new();
716        let mut col_index: HashMap<i32, usize> = HashMap::new();
717
718        let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
719        let version_column_index = tb.version_column_index.map(|value| value as usize);
720        let mut columns: Vec<ColumnCatalog> =
721            tb.columns.into_iter().map(ColumnCatalog::from).collect();
722        if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
723            // Add system column `_rw_timestamp` to every table, but notice that this column is never persisted.
724            columns.push(ColumnCatalog::rw_timestamp_column());
725        }
726        for (idx, catalog) in columns.clone().into_iter().enumerate() {
727            let col_name = catalog.name();
728            if !col_names.insert(col_name.to_owned()) {
729                panic!("duplicated column name {} in table {} ", col_name, tb.name)
730            }
731
732            let col_id = catalog.column_desc.column_id.get_id();
733            col_index.insert(col_id, idx);
734        }
735
736        let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
737        let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
738        for idx in &tb.watermark_indices {
739            watermark_columns.insert(*idx as _);
740        }
741        let engine = Engine::from_protobuf(&tb_engine);
742
743        Self {
744            id: id.into(),
745            schema_id: tb.schema_id,
746            database_id: tb.database_id,
747            associated_source_id: associated_source_id.map(Into::into),
748            name,
749            pk,
750            columns,
751            table_type: TableType::from_prost(table_type),
752            distribution_key: tb
753                .distribution_key
754                .iter()
755                .map(|k| *k as usize)
756                .collect_vec(),
757            stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
758            append_only: tb.append_only,
759            owner: tb.owner,
760            fragment_id: tb.fragment_id,
761            dml_fragment_id: tb.dml_fragment_id,
762            vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
763            row_id_index: tb.row_id_index.map(|x| x as usize),
764            value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
765            definition: tb.definition,
766            conflict_behavior,
767            version_column_index,
768            read_prefix_len_hint: tb.read_prefix_len_hint as usize,
769            version: tb.version.map(TableVersion::from_prost),
770            watermark_columns,
771            dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
772            cardinality: tb
773                .cardinality
774                .map(|c| Cardinality::from_protobuf(&c))
775                .unwrap_or_else(Cardinality::unknown),
776            created_at_epoch: tb.created_at_epoch.map(Epoch::from),
777            initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
778            cleaned_by_watermark: tb.cleaned_by_watermark,
779            create_type: CreateType::from_proto(create_type),
780            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
781            description: tb.description,
782            incoming_sinks: tb.incoming_sinks.clone(),
783            created_at_cluster_version: tb.created_at_cluster_version.clone(),
784            initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
785            retention_seconds: tb.retention_seconds,
786            dependent_relations: tb
787                .dependent_relations
788                .into_iter()
789                .map(TableId::from)
790                .collect_vec(),
791            cdc_table_id: tb.cdc_table_id,
792            vnode_count,
793            webhook_info: tb.webhook_info,
794            job_id: tb.job_id.map(TableId::from),
795            engine,
796            clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
797        }
798    }
799}
800
801impl From<&PbTable> for TableCatalog {
802    fn from(tb: &PbTable) -> Self {
803        tb.clone().into()
804    }
805}
806
807impl OwnedByUserCatalog for TableCatalog {
808    fn owner(&self) -> UserId {
809        self.owner
810    }
811}
812
813#[cfg(test)]
814mod tests {
815    use risingwave_common::catalog::{ColumnDesc, ColumnId};
816    use risingwave_common::test_prelude::*;
817    use risingwave_common::types::*;
818    use risingwave_common::util::sort_util::OrderType;
819    use risingwave_pb::catalog::table::PbEngine;
820    use risingwave_pb::plan_common::{
821        AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
822    };
823
824    use super::*;
825
826    #[test]
827    fn test_into_table_catalog() {
828        let table: TableCatalog = PbTable {
829            id: 0,
830            schema_id: 0,
831            database_id: 0,
832            name: "test".to_owned(),
833            table_type: PbTableType::Table as i32,
834            columns: vec![
835                ColumnCatalog::row_id_column().to_protobuf(),
836                PbColumnCatalog {
837                    column_desc: Some(PbColumnDesc::new(
838                        DataType::from(StructType::new([
839                            ("address", DataType::Varchar),
840                            ("zipcode", DataType::Varchar),
841                        ]))
842                        .to_protobuf(),
843                        "country",
844                        1,
845                    )),
846                    is_hidden: false,
847                },
848            ],
849            pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
850            stream_key: vec![0],
851            dependent_relations: vec![114514],
852            distribution_key: vec![0],
853            optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233)
854                .into(),
855            append_only: false,
856            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
857            retention_seconds: Some(300),
858            fragment_id: 0,
859            dml_fragment_id: None,
860            initialized_at_epoch: None,
861            value_indices: vec![0],
862            definition: "".into(),
863            read_prefix_len_hint: 0,
864            vnode_col_index: None,
865            row_id_index: None,
866            version: Some(PbTableVersion {
867                version: 0,
868                next_column_id: 2,
869            }),
870            watermark_indices: vec![],
871            handle_pk_conflict_behavior: 3,
872            dist_key_in_pk: vec![0],
873            cardinality: None,
874            created_at_epoch: None,
875            cleaned_by_watermark: false,
876            stream_job_status: PbStreamJobStatus::Created.into(),
877            create_type: PbCreateType::Foreground.into(),
878            description: Some("description".to_owned()),
879            incoming_sinks: vec![],
880            created_at_cluster_version: None,
881            initialized_at_cluster_version: None,
882            version_column_index: None,
883            cdc_table_id: None,
884            maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
885            webhook_info: None,
886            job_id: None,
887            engine: Some(PbEngine::Hummock as i32),
888            clean_watermark_index_in_pk: None,
889        }
890        .into();
891
892        assert_eq!(
893            table,
894            TableCatalog {
895                id: TableId::new(0),
896                schema_id: 0,
897                database_id: 0,
898                associated_source_id: Some(TableId::new(233)),
899                name: "test".to_owned(),
900                table_type: TableType::Table,
901                columns: vec![
902                    ColumnCatalog::row_id_column(),
903                    ColumnCatalog {
904                        column_desc: ColumnDesc {
905                            data_type: StructType::new(vec![
906                                ("address", DataType::Varchar),
907                                ("zipcode", DataType::Varchar)
908                            ],)
909                            .into(),
910                            column_id: ColumnId::new(1),
911                            name: "country".to_owned(),
912                            description: None,
913                            generated_or_default_column: None,
914                            additional_column: AdditionalColumn { column_type: None },
915                            version: ColumnDescVersion::LATEST,
916                            system_column: None,
917                            nullable: true,
918                        },
919                        is_hidden: false
920                    },
921                    ColumnCatalog::rw_timestamp_column(),
922                ],
923                stream_key: vec![0],
924                pk: vec![ColumnOrder::new(0, OrderType::ascending())],
925                distribution_key: vec![0],
926                append_only: false,
927                owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
928                retention_seconds: Some(300),
929                fragment_id: 0,
930                dml_fragment_id: None,
931                vnode_col_index: None,
932                row_id_index: None,
933                value_indices: vec![0],
934                definition: "".into(),
935                conflict_behavior: ConflictBehavior::NoCheck,
936                read_prefix_len_hint: 0,
937                version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
938                watermark_columns: FixedBitSet::with_capacity(3),
939                dist_key_in_pk: vec![0],
940                cardinality: Cardinality::unknown(),
941                created_at_epoch: None,
942                initialized_at_epoch: None,
943                cleaned_by_watermark: false,
944                stream_job_status: StreamJobStatus::Created,
945                create_type: CreateType::Foreground,
946                description: Some("description".to_owned()),
947                incoming_sinks: vec![],
948                created_at_cluster_version: None,
949                initialized_at_cluster_version: None,
950                dependent_relations: vec![114514.into()],
951                version_column_index: None,
952                cdc_table_id: None,
953                vnode_count: VnodeCount::set(233),
954                webhook_info: None,
955                job_id: None,
956                engine: Engine::Hummock,
957                clean_watermark_index_in_pk: None,
958            }
959        );
960        assert_eq!(table, TableCatalog::from(table.to_prost()));
961    }
962}