1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21    ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, Schema,
22    StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_connector::source::cdc::external::ExternalCdcTableType;
28use risingwave_pb::catalog::table::{
29    CdcTableType as PbCdcTableType, OptionalAssociatedSourceId, PbEngine, PbTableType,
30    PbTableVersion,
31};
32use risingwave_pb::catalog::{
33    PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
34};
35use risingwave_pb::common::PbColumnOrder;
36use risingwave_pb::plan_common::DefaultColumnDesc;
37use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
38use risingwave_sqlparser::ast;
39use risingwave_sqlparser::parser::Parser;
40use thiserror_ext::AsReport as _;
41
42use super::purify::try_purify_table_source_create_sql_ast;
43use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId};
44use crate::error::{ErrorCode, Result, RwError};
45use crate::expr::ExprImpl;
46use crate::optimizer::property::Cardinality;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50#[derive(Clone, Debug, PartialEq, Eq, Hash)]
82#[cfg_attr(test, derive(Default))]
83pub struct TableCatalog {
84    pub id: TableId,
85
86    pub schema_id: SchemaId,
87
88    pub database_id: DatabaseId,
89
90    pub associated_source_id: Option<TableId>, pub name: String,
93
94    pub columns: Vec<ColumnCatalog>,
96
97    pub pk: Vec<ColumnOrder>,
99
100    pub stream_key: Vec<usize>,
104
105    pub table_type: TableType,
108
109    pub distribution_key: Vec<usize>,
111
112    pub append_only: bool,
115
116    pub cardinality: Cardinality,
118
119    pub owner: UserId,
121
122    pub retention_seconds: Option<u32>,
124
125    pub fragment_id: FragmentId,
127
128    pub dml_fragment_id: Option<FragmentId>,
130
131    pub vnode_col_index: Option<usize>,
134
135    pub row_id_index: Option<usize>,
138
139    pub value_indices: Vec<usize>,
141
142    pub definition: String,
144
145    pub conflict_behavior: ConflictBehavior,
149
150    pub version_column_indices: Vec<usize>,
151
152    pub read_prefix_len_hint: usize,
153
154    pub version: Option<TableVersion>,
156
157    pub watermark_columns: FixedBitSet,
159
160    pub dist_key_in_pk: Vec<usize>,
163
164    pub created_at_epoch: Option<Epoch>,
165
166    pub initialized_at_epoch: Option<Epoch>,
167
168    pub cleaned_by_watermark: bool,
170
171    pub create_type: CreateType,
173
174    pub stream_job_status: StreamJobStatus,
177
178    pub description: Option<String>,
180
181    pub created_at_cluster_version: Option<String>,
182
183    pub initialized_at_cluster_version: Option<String>,
184
185    pub cdc_table_id: Option<String>,
186
187    pub vnode_count: VnodeCount,
196
197    pub webhook_info: Option<PbWebhookSourceInfo>,
198
199    pub job_id: Option<TableId>,
200
201    pub engine: Engine,
202
203    pub clean_watermark_index_in_pk: Option<usize>,
204
205    pub refreshable: bool,
207
208    pub vector_index_info: Option<PbVectorIndexInfo>,
209
210    pub cdc_table_type: Option<ExternalCdcTableType>,
211}
212
213pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
214pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
215
216#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
217#[cfg_attr(test, derive(Default))]
218pub enum TableType {
219    #[cfg_attr(test, default)]
221    Table,
222    MaterializedView,
224    Index,
227    VectorIndex,
228    Internal,
230}
231
232impl TableType {
233    fn from_prost(prost: PbTableType) -> Self {
234        match prost {
235            PbTableType::Table => Self::Table,
236            PbTableType::MaterializedView => Self::MaterializedView,
237            PbTableType::Index => Self::Index,
238            PbTableType::Internal => Self::Internal,
239            PbTableType::VectorIndex => Self::VectorIndex,
240            PbTableType::Unspecified => unreachable!(),
241        }
242    }
243
244    pub(crate) fn to_prost(self) -> PbTableType {
245        match self {
246            Self::Table => PbTableType::Table,
247            Self::MaterializedView => PbTableType::MaterializedView,
248            Self::Index => PbTableType::Index,
249            Self::VectorIndex => PbTableType::VectorIndex,
250            Self::Internal => PbTableType::Internal,
251        }
252    }
253}
254
255#[derive(Clone, Debug, PartialEq, Eq, Hash)]
257pub struct TableVersion {
258    pub version_id: TableVersionId,
259    pub next_column_id: ColumnId,
260}
261
262impl TableVersion {
263    #[cfg(test)]
265    pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
266        use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
267
268        Self {
269            version_id: INITIAL_TABLE_VERSION_ID,
270            next_column_id: max_column_id.next(),
271        }
272    }
273
274    pub fn from_prost(prost: PbTableVersion) -> Self {
275        Self {
276            version_id: prost.version,
277            next_column_id: ColumnId::from(prost.next_column_id),
278        }
279    }
280
281    pub fn to_prost(&self) -> PbTableVersion {
282        PbTableVersion {
283            version: self.version_id,
284            next_column_id: self.next_column_id.into(),
285        }
286    }
287}
288
289impl TableCatalog {
290    pub fn create_sql_purified(&self) -> String {
295        self.create_sql_ast_purified()
296            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
297            .unwrap_or_else(|_| self.create_sql())
298    }
299
300    pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
305        if let TableType::Table = self.table_type() {
307            let base = if self.definition.is_empty() {
308                let name = ast::ObjectName(vec![self.name.as_str().into()]);
310                ast::Statement::default_create_table(name)
311            } else {
312                self.create_sql_ast_from_persisted()?
313            };
314
315            match try_purify_table_source_create_sql_ast(
316                base,
317                self.columns(),
318                self.row_id_index,
319                &self.pk_column_ids(),
320            ) {
321                Ok(stmt) => return Ok(stmt),
322                Err(e) => notice_to_user(format!(
323                    "error occurred while purifying definition for table \"{}\", \
324                     results may be inaccurate: {}",
325                    self.name,
326                    e.as_report()
327                )),
328            }
329        }
330
331        self.create_sql_ast_from_persisted()
332    }
333}
334
335impl TableCatalog {
336    pub fn id(&self) -> TableId {
338        self.id
339    }
340
341    pub fn with_id(mut self, id: TableId) -> Self {
342        self.id = id;
343        self
344    }
345
346    pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
347        self.cleaned_by_watermark = cleaned_by_watermark;
348        self
349    }
350
351    pub fn conflict_behavior(&self) -> ConflictBehavior {
352        self.conflict_behavior
353    }
354
355    pub fn table_type(&self) -> TableType {
356        self.table_type
357    }
358
359    pub fn engine(&self) -> Engine {
360        self.engine
361    }
362
363    pub fn iceberg_source_name(&self) -> Option<String> {
364        match self.engine {
365            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
366            Engine::Hummock => None,
367        }
368    }
369
370    pub fn iceberg_sink_name(&self) -> Option<String> {
371        match self.engine {
372            Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
373            Engine::Hummock => None,
374        }
375    }
376
377    pub fn is_user_table(&self) -> bool {
378        self.table_type == TableType::Table
379    }
380
381    pub fn is_internal_table(&self) -> bool {
382        self.table_type == TableType::Internal
383    }
384
385    pub fn is_mview(&self) -> bool {
386        self.table_type == TableType::MaterializedView
387    }
388
389    pub fn is_index(&self) -> bool {
390        self.table_type == TableType::Index
391    }
392
393    #[must_use]
395    pub fn bad_drop_error(&self) -> RwError {
396        let msg = match self.table_type {
397            TableType::MaterializedView => {
398                "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
399            }
400            TableType::Index | TableType::VectorIndex => "Use `DROP INDEX` to drop an index.",
401            TableType::Table => "Use `DROP TABLE` to drop a table.",
402            TableType::Internal => "Internal tables cannot be dropped.",
403        };
404
405        ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
406    }
407
408    #[must_use]
410    pub fn associated_source_id(&self) -> Option<TableId> {
411        self.associated_source_id
412    }
413
414    pub fn has_associated_source(&self) -> bool {
415        self.associated_source_id.is_some()
416    }
417
418    pub fn columns(&self) -> &[ColumnCatalog] {
420        &self.columns
421    }
422
423    pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
424        self.columns
425            .iter()
426            .filter(|c| !c.is_rw_timestamp_column())
427            .cloned()
428            .collect()
429    }
430
431    pub fn pk(&self) -> &[ColumnOrder] {
433        self.pk.as_ref()
434    }
435
436    pub fn pk_column_ids(&self) -> Vec<ColumnId> {
438        self.pk
439            .iter()
440            .map(|x| self.columns[x.column_index].column_id())
441            .collect()
442    }
443
444    pub fn pk_column_names(&self) -> Vec<&str> {
446        self.pk
447            .iter()
448            .map(|x| self.columns[x.column_index].name())
449            .collect()
450    }
451
452    pub fn stream_key(&self) -> Vec<usize> {
456        if self
458            .distribution_key
459            .iter()
460            .any(|dist_key| !self.stream_key.contains(dist_key))
461        {
462            let mut new_stream_key = self.distribution_key.clone();
463            let mut seen: HashSet<usize> = self.distribution_key.iter().copied().collect();
464            for &key in &self.stream_key {
465                if seen.insert(key) {
466                    new_stream_key.push(key);
467                }
468            }
469            new_stream_key
470        } else {
471            self.stream_key.clone()
472        }
473    }
474
475    pub fn table_desc(&self) -> TableDesc {
480        use risingwave_common::catalog::TableOption;
481
482        let table_options = TableOption::new(self.retention_seconds);
483
484        TableDesc {
485            table_id: self.id,
486            pk: self.pk.clone(),
487            stream_key: self.stream_key(),
488            columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
489            distribution_key: self.distribution_key.clone(),
490            append_only: self.append_only,
491            retention_seconds: table_options.retention_seconds,
492            value_indices: self.value_indices.clone(),
493            read_prefix_len_hint: self.read_prefix_len_hint,
494            watermark_columns: self.watermark_columns.clone(),
495            versioned: self.version.is_some(),
496            vnode_col_index: self.vnode_col_index,
497            vnode_count: self.vnode_count(),
498        }
499    }
500
501    pub fn name(&self) -> &str {
503        self.name.as_ref()
504    }
505
506    pub fn distribution_key(&self) -> &[usize] {
507        self.distribution_key.as_ref()
508    }
509
510    pub fn to_internal_table_prost(&self) -> PbTable {
511        self.to_prost()
512    }
513
514    pub fn create_sql(&self) -> String {
518        self.create_sql_ast()
519            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
520            .unwrap_or_else(|_| self.definition.clone())
521    }
522
523    pub fn create_sql_ast(&self) -> Result<ast::Statement> {
531        if let TableType::Table = self.table_type()
532            && self.definition.is_empty()
533        {
534            self.create_sql_ast_purified()
536        } else {
537            self.create_sql_ast_from_persisted()
539        }
540    }
541
542    fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
543        Ok(Parser::parse_exactly_one(&self.definition)?)
544    }
545
546    pub fn version(&self) -> Option<&TableVersion> {
548        self.version.as_ref()
549    }
550
551    pub fn version_id(&self) -> Option<TableVersionId> {
553        self.version().map(|v| v.version_id)
554    }
555
556    pub fn vnode_count(&self) -> usize {
558        if self.id().is_placeholder() {
559            0
560        } else {
561            self.vnode_count.value()
563        }
564    }
565
566    pub fn to_prost(&self) -> PbTable {
567        PbTable {
568            id: self.id.table_id,
569            schema_id: self.schema_id,
570            database_id: self.database_id,
571            name: self.name.clone(),
572            columns: self
574                .columns_without_rw_timestamp()
575                .iter()
576                .map(|c| c.to_protobuf())
577                .collect(),
578            pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
579            stream_key: self.stream_key().iter().map(|x| *x as _).collect(),
580            optional_associated_source_id: self
581                .associated_source_id
582                .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
583            table_type: self.table_type.to_prost() as i32,
584            distribution_key: self
585                .distribution_key
586                .iter()
587                .map(|k| *k as i32)
588                .collect_vec(),
589            append_only: self.append_only,
590            owner: self.owner,
591            fragment_id: self.fragment_id,
592            dml_fragment_id: self.dml_fragment_id,
593            vnode_col_index: self.vnode_col_index.map(|i| i as _),
594            row_id_index: self.row_id_index.map(|i| i as _),
595            value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
596            definition: self.definition.clone(),
597            read_prefix_len_hint: self.read_prefix_len_hint as u32,
598            version: self.version.as_ref().map(TableVersion::to_prost),
599            watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
600            dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
601            handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
602            version_column_indices: self
603                .version_column_indices
604                .iter()
605                .map(|&idx| idx as u32)
606                .collect(),
607            cardinality: Some(self.cardinality.to_protobuf()),
608            initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
609            created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
610            cleaned_by_watermark: self.cleaned_by_watermark,
611            stream_job_status: self.stream_job_status.to_proto().into(),
612            create_type: self.create_type.to_proto().into(),
613            description: self.description.clone(),
614            #[expect(deprecated)]
615            incoming_sinks: vec![],
616            created_at_cluster_version: self.created_at_cluster_version.clone(),
617            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
618            retention_seconds: self.retention_seconds,
619            cdc_table_id: self.cdc_table_id.clone(),
620            maybe_vnode_count: self.vnode_count.to_protobuf(),
621            webhook_info: self.webhook_info.clone(),
622            job_id: self.job_id.map(|id| id.table_id),
623            engine: Some(self.engine.to_protobuf().into()),
624            clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
625            refreshable: self.refreshable,
626            vector_index_info: self.vector_index_info,
627            cdc_table_type: self
628                .cdc_table_type
629                .clone()
630                .map(|t| PbCdcTableType::from(t) as i32),
631            refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
632        }
633    }
634
635    pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
637        self.columns
638            .iter()
639            .filter(|c| !c.is_hidden() && !c.is_generated())
640    }
641
642    pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
643        self.columns
644            .iter()
645            .filter(|c| c.is_generated())
646            .map(|c| c.name())
647    }
648
649    pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
650        self.columns
651            .iter()
652            .enumerate()
653            .filter(|(_, c)| c.is_generated())
654            .map(|(i, _)| i)
655    }
656
657    pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
658        if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
659            .columns[col_idx]
660            .column_desc
661            .generated_or_default_column
662            .as_ref()
663        {
664            ExprImpl::from_expr_proto(expr.as_ref().unwrap())
665                .expect("expr in default columns corrupted")
666        } else {
667            ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
668        }
669    }
670
671    pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
672        columns
673            .iter()
674            .map(|c| {
675                if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
676                    expr,
677                    ..
678                })) = c.column_desc.generated_or_default_column.as_ref()
679                {
680                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
681                        .expect("expr in default columns corrupted")
682                } else {
683                    ExprImpl::literal_null(c.data_type().clone())
684                }
685            })
686            .collect()
687    }
688
689    pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
690        self.columns.iter().enumerate().filter_map(|(i, c)| {
691            if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
692                expr, ..
693            })) = c.column_desc.generated_or_default_column.as_ref()
694            {
695                Some((
696                    i,
697                    ExprImpl::from_expr_proto(expr.as_ref().unwrap())
698                        .expect("expr in default columns corrupted"),
699                ))
700            } else {
701                None
702            }
703        })
704    }
705
706    pub fn has_generated_column(&self) -> bool {
707        self.columns.iter().any(|c| c.is_generated())
708    }
709
710    pub fn has_rw_timestamp_column(&self) -> bool {
711        self.columns.iter().any(|c| c.is_rw_timestamp_column())
712    }
713
714    pub fn column_schema(&self) -> Schema {
715        Schema::new(
716            self.columns
717                .iter()
718                .map(|c| Field::from(&c.column_desc))
719                .collect(),
720        )
721    }
722
723    pub fn is_created(&self) -> bool {
724        self.stream_job_status == StreamJobStatus::Created
725    }
726
727    pub fn is_iceberg_engine_table(&self) -> bool {
728        self.engine == Engine::Iceberg
729    }
730
731    pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
732        self.pk.iter().map(|col| col.column_index)
733    }
734
735    pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
736        ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
737    }
738
739    pub fn order_column_ids(&self) -> Vec<ColumnId> {
740        self.pk
741            .iter()
742            .map(|col| self.columns[col.column_index].column_desc.column_id)
743            .collect()
744    }
745
746    pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
747        self.pk.iter().map(|x| x.to_protobuf()).collect()
749    }
750}
751
752impl From<PbTable> for TableCatalog {
753    fn from(tb: PbTable) -> Self {
754        let id = tb.id;
755        let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
756        let tb_engine = tb
757            .get_engine()
758            .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
759            .unwrap_or(PbEngine::Hummock);
760        let table_type = tb.get_table_type().unwrap();
761        let stream_job_status = tb
762            .get_stream_job_status()
763            .unwrap_or(PbStreamJobStatus::Created);
764        let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
765        let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
766            OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
767        });
768        let name = tb.name.clone();
769
770        let vnode_count = tb.vnode_count_inner();
771        if let VnodeCount::Placeholder = vnode_count {
772            assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
775        }
776
777        let mut col_names = HashSet::new();
778        let mut col_index: HashMap<i32, usize> = HashMap::new();
779
780        let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
781        let version_column_indices: Vec<usize> = tb
782            .version_column_indices
783            .iter()
784            .map(|&idx| idx as usize)
785            .collect();
786        let mut columns: Vec<ColumnCatalog> =
787            tb.columns.into_iter().map(ColumnCatalog::from).collect();
788        if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
789            columns.push(ColumnCatalog::rw_timestamp_column());
791        }
792        for (idx, catalog) in columns.clone().into_iter().enumerate() {
793            let col_name = catalog.name();
794            if !col_names.insert(col_name.to_owned()) {
795                panic!("duplicated column name {} in table {} ", col_name, tb.name)
796            }
797
798            let col_id = catalog.column_desc.column_id.get_id();
799            col_index.insert(col_id, idx);
800        }
801
802        let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
803        let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
804        for idx in &tb.watermark_indices {
805            watermark_columns.insert(*idx as _);
806        }
807        let engine = Engine::from_protobuf(&tb_engine);
808
809        Self {
810            id: id.into(),
811            schema_id: tb.schema_id,
812            database_id: tb.database_id,
813            associated_source_id: associated_source_id.map(Into::into),
814            name,
815            pk,
816            columns,
817            table_type: TableType::from_prost(table_type),
818            distribution_key: tb
819                .distribution_key
820                .iter()
821                .map(|k| *k as usize)
822                .collect_vec(),
823            stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
824            append_only: tb.append_only,
825            owner: tb.owner,
826            fragment_id: tb.fragment_id,
827            dml_fragment_id: tb.dml_fragment_id,
828            vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
829            row_id_index: tb.row_id_index.map(|x| x as usize),
830            value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
831            definition: tb.definition,
832            conflict_behavior,
833            version_column_indices,
834            read_prefix_len_hint: tb.read_prefix_len_hint as usize,
835            version: tb.version.map(TableVersion::from_prost),
836            watermark_columns,
837            dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
838            cardinality: tb
839                .cardinality
840                .map(|c| Cardinality::from_protobuf(&c))
841                .unwrap_or_else(Cardinality::unknown),
842            created_at_epoch: tb.created_at_epoch.map(Epoch::from),
843            initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
844            cleaned_by_watermark: tb.cleaned_by_watermark,
845            create_type: CreateType::from_proto(create_type),
846            stream_job_status: StreamJobStatus::from_proto(stream_job_status),
847            description: tb.description,
848            created_at_cluster_version: tb.created_at_cluster_version.clone(),
849            initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
850            retention_seconds: tb.retention_seconds,
851            cdc_table_id: tb.cdc_table_id,
852            vnode_count,
853            webhook_info: tb.webhook_info,
854            job_id: tb.job_id.map(TableId::from),
855            engine,
856            clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
857
858            refreshable: tb.refreshable,
859            vector_index_info: tb.vector_index_info,
860            cdc_table_type: tb
861                .cdc_table_type
862                .and_then(|t| PbCdcTableType::try_from(t).ok())
863                .map(ExternalCdcTableType::from),
864        }
865    }
866}
867
868impl From<&PbTable> for TableCatalog {
869    fn from(tb: &PbTable) -> Self {
870        tb.clone().into()
871    }
872}
873
874impl OwnedByUserCatalog for TableCatalog {
875    fn owner(&self) -> UserId {
876        self.owner
877    }
878}
879
880#[cfg(test)]
881mod tests {
882    use risingwave_common::catalog::{ColumnDesc, ColumnId};
883    use risingwave_common::test_prelude::*;
884    use risingwave_common::types::*;
885    use risingwave_common::util::sort_util::OrderType;
886    use risingwave_pb::catalog::table::PbEngine;
887    use risingwave_pb::plan_common::{
888        AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
889    };
890
891    use super::*;
892
893    #[test]
894    fn test_into_table_catalog() {
895        let table: TableCatalog = PbTable {
896            id: 0,
897            schema_id: 0,
898            database_id: 0,
899            name: "test".to_owned(),
900            table_type: PbTableType::Table as i32,
901            columns: vec![
902                ColumnCatalog::row_id_column().to_protobuf(),
903                PbColumnCatalog {
904                    column_desc: Some(PbColumnDesc::new(
905                        DataType::from(StructType::new([
906                            ("address", DataType::Varchar),
907                            ("zipcode", DataType::Varchar),
908                        ]))
909                        .to_protobuf(),
910                        "country",
911                        1,
912                    )),
913                    is_hidden: false,
914                },
915            ],
916            pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
917            stream_key: vec![0],
918            distribution_key: vec![0],
919            optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233)
920                .into(),
921            append_only: false,
922            owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
923            retention_seconds: Some(300),
924            fragment_id: 0,
925            dml_fragment_id: None,
926            initialized_at_epoch: None,
927            value_indices: vec![0],
928            definition: "".into(),
929            read_prefix_len_hint: 0,
930            vnode_col_index: None,
931            row_id_index: None,
932            version: Some(PbTableVersion {
933                version: 0,
934                next_column_id: 2,
935            }),
936            watermark_indices: vec![],
937            handle_pk_conflict_behavior: 3,
938            dist_key_in_pk: vec![0],
939            cardinality: None,
940            created_at_epoch: None,
941            cleaned_by_watermark: false,
942            stream_job_status: PbStreamJobStatus::Created.into(),
943            create_type: PbCreateType::Foreground.into(),
944            description: Some("description".to_owned()),
945            #[expect(deprecated)]
946            incoming_sinks: vec![],
947            created_at_cluster_version: None,
948            initialized_at_cluster_version: None,
949            version_column_indices: Vec::new(),
950            cdc_table_id: None,
951            maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
952            webhook_info: None,
953            job_id: None,
954            engine: Some(PbEngine::Hummock as i32),
955            clean_watermark_index_in_pk: None,
956
957            refreshable: false,
958            vector_index_info: None,
959            cdc_table_type: None,
960            refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
961        }
962        .into();
963
964        assert_eq!(
965            table,
966            TableCatalog {
967                id: TableId::new(0),
968                schema_id: 0,
969                database_id: 0,
970                associated_source_id: Some(TableId::new(233)),
971                name: "test".to_owned(),
972                table_type: TableType::Table,
973                columns: vec![
974                    ColumnCatalog::row_id_column(),
975                    ColumnCatalog {
976                        column_desc: ColumnDesc {
977                            data_type: StructType::new(vec![
978                                ("address", DataType::Varchar),
979                                ("zipcode", DataType::Varchar)
980                            ],)
981                            .into(),
982                            column_id: ColumnId::new(1),
983                            name: "country".to_owned(),
984                            description: None,
985                            generated_or_default_column: None,
986                            additional_column: AdditionalColumn { column_type: None },
987                            version: ColumnDescVersion::LATEST,
988                            system_column: None,
989                            nullable: true,
990                        },
991                        is_hidden: false
992                    },
993                    ColumnCatalog::rw_timestamp_column(),
994                ],
995                stream_key: vec![0],
996                pk: vec![ColumnOrder::new(0, OrderType::ascending())],
997                distribution_key: vec![0],
998                append_only: false,
999                owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
1000                retention_seconds: Some(300),
1001                fragment_id: 0,
1002                dml_fragment_id: None,
1003                vnode_col_index: None,
1004                row_id_index: None,
1005                value_indices: vec![0],
1006                definition: "".into(),
1007                conflict_behavior: ConflictBehavior::NoCheck,
1008                read_prefix_len_hint: 0,
1009                version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
1010                watermark_columns: FixedBitSet::with_capacity(3),
1011                dist_key_in_pk: vec![0],
1012                cardinality: Cardinality::unknown(),
1013                created_at_epoch: None,
1014                initialized_at_epoch: None,
1015                cleaned_by_watermark: false,
1016                stream_job_status: StreamJobStatus::Created,
1017                create_type: CreateType::Foreground,
1018                description: Some("description".to_owned()),
1019                created_at_cluster_version: None,
1020                initialized_at_cluster_version: None,
1021                version_column_indices: Vec::new(),
1022                cdc_table_id: None,
1023                vnode_count: VnodeCount::set(233),
1024                webhook_info: None,
1025                job_id: None,
1026                engine: Engine::Hummock,
1027                clean_watermark_index_in_pk: None,
1028
1029                refreshable: false,
1030                vector_index_info: None,
1031                cdc_table_type: None,
1032            }
1033        );
1034        assert_eq!(table, TableCatalog::from(table.to_prost()));
1035    }
1036}