1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21 ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, ICEBERG_SINK_PREFIX,
22 ICEBERG_SOURCE_PREFIX, Schema, StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::id::{JobId, SourceId};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_connector::source::cdc::external::ExternalCdcTableType;
29use risingwave_pb::catalog::table::{
30 CdcTableType as PbCdcTableType, PbEngine, PbTableType, PbTableVersion,
31};
32use risingwave_pb::catalog::{
33 PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
34};
35use risingwave_pb::common::PbColumnOrder;
36use risingwave_pb::plan_common::DefaultColumnDesc;
37use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
38use risingwave_sqlparser::ast;
39use risingwave_sqlparser::parser::Parser;
40use thiserror_ext::AsReport as _;
41
42use super::purify::try_purify_table_source_create_sql_ast;
43use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId};
44use crate::error::{ErrorCode, Result, RwError};
45use crate::expr::ExprImpl;
46use crate::optimizer::property::Cardinality;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50#[derive(Clone, Debug, PartialEq, Eq, Hash)]
82#[cfg_attr(test, derive(Default))]
83pub struct TableCatalog {
84 pub id: TableId,
85
86 pub schema_id: SchemaId,
87
88 pub database_id: DatabaseId,
89
90 pub associated_source_id: Option<SourceId>, pub name: String,
93
94 pub columns: Vec<ColumnCatalog>,
96
97 pub pk: Vec<ColumnOrder>,
99
100 pub stream_key: Vec<usize>,
104
105 pub table_type: TableType,
108
109 pub distribution_key: Vec<usize>,
111
112 pub append_only: bool,
115
116 pub cardinality: Cardinality,
118
119 pub owner: UserId,
121
122 pub retention_seconds: Option<u32>,
124
125 pub fragment_id: FragmentId,
127
128 pub dml_fragment_id: Option<FragmentId>,
130
131 pub vnode_col_index: Option<usize>,
134
135 pub row_id_index: Option<usize>,
138
139 pub value_indices: Vec<usize>,
141
142 pub definition: String,
144
145 pub conflict_behavior: ConflictBehavior,
149
150 pub version_column_indices: Vec<usize>,
151
152 pub read_prefix_len_hint: usize,
153
154 pub version: Option<TableVersion>,
156
157 pub watermark_columns: FixedBitSet,
159
160 pub dist_key_in_pk: Vec<usize>,
163
164 pub created_at_epoch: Option<Epoch>,
165
166 pub initialized_at_epoch: Option<Epoch>,
167
168 pub cleaned_by_watermark: bool,
170
171 pub create_type: CreateType,
173
174 pub stream_job_status: StreamJobStatus,
177
178 pub description: Option<String>,
180
181 pub created_at_cluster_version: Option<String>,
182
183 pub initialized_at_cluster_version: Option<String>,
184
185 pub cdc_table_id: Option<String>,
186
187 pub vnode_count: VnodeCount,
196
197 pub webhook_info: Option<PbWebhookSourceInfo>,
198
199 pub job_id: Option<JobId>,
200
201 pub engine: Engine,
202
203 pub clean_watermark_index_in_pk: Option<usize>,
204
205 pub clean_watermark_indices: Vec<usize>,
209
210 pub refreshable: bool,
212
213 pub vector_index_info: Option<PbVectorIndexInfo>,
214
215 pub cdc_table_type: Option<ExternalCdcTableType>,
216}
217
218#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
219#[cfg_attr(test, derive(Default))]
220pub enum TableType {
221 #[cfg_attr(test, default)]
223 Table,
224 MaterializedView,
226 Index,
229 VectorIndex,
230 Internal,
232}
233
234impl TableType {
235 fn from_prost(prost: PbTableType) -> Self {
236 match prost {
237 PbTableType::Table => Self::Table,
238 PbTableType::MaterializedView => Self::MaterializedView,
239 PbTableType::Index => Self::Index,
240 PbTableType::Internal => Self::Internal,
241 PbTableType::VectorIndex => Self::VectorIndex,
242 PbTableType::Unspecified => unreachable!(),
243 }
244 }
245
246 pub(crate) fn to_prost(self) -> PbTableType {
247 match self {
248 Self::Table => PbTableType::Table,
249 Self::MaterializedView => PbTableType::MaterializedView,
250 Self::Index => PbTableType::Index,
251 Self::VectorIndex => PbTableType::VectorIndex,
252 Self::Internal => PbTableType::Internal,
253 }
254 }
255}
256
257#[derive(Clone, Debug, PartialEq, Eq, Hash)]
259pub struct TableVersion {
260 pub version_id: TableVersionId,
261 pub next_column_id: ColumnId,
262}
263
264impl TableVersion {
265 #[cfg(test)]
267 pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
268 use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
269
270 Self {
271 version_id: INITIAL_TABLE_VERSION_ID,
272 next_column_id: max_column_id.next(),
273 }
274 }
275
276 pub fn from_prost(prost: PbTableVersion) -> Self {
277 Self {
278 version_id: prost.version,
279 next_column_id: ColumnId::from(prost.next_column_id),
280 }
281 }
282
283 pub fn to_prost(&self) -> PbTableVersion {
284 PbTableVersion {
285 version: self.version_id,
286 next_column_id: self.next_column_id.into(),
287 }
288 }
289}
290
291impl TableCatalog {
292 pub fn create_sql_purified(&self) -> String {
297 self.create_sql_ast_purified()
298 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
299 .unwrap_or_else(|_| self.create_sql())
300 }
301
302 pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
307 if let TableType::Table = self.table_type() {
309 let base = if self.definition.is_empty() {
310 let name = ast::ObjectName(vec![self.name.as_str().into()]);
312 ast::Statement::default_create_table(name)
313 } else {
314 self.create_sql_ast_from_persisted()?
315 };
316
317 match try_purify_table_source_create_sql_ast(
318 base,
319 self.columns(),
320 self.row_id_index,
321 &self.pk_column_ids(),
322 ) {
323 Ok(stmt) => return Ok(stmt),
324 Err(e) => notice_to_user(format!(
325 "error occurred while purifying definition for table \"{}\", \
326 results may be inaccurate: {}",
327 self.name,
328 e.as_report()
329 )),
330 }
331 }
332
333 self.create_sql_ast_from_persisted()
334 }
335}
336
337impl TableCatalog {
338 pub fn id(&self) -> TableId {
340 self.id
341 }
342
343 pub fn with_id(mut self, id: TableId) -> Self {
344 self.id = id;
345 self
346 }
347
348 pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
349 self.cleaned_by_watermark = cleaned_by_watermark;
350 self
351 }
352
353 pub fn conflict_behavior(&self) -> ConflictBehavior {
354 self.conflict_behavior
355 }
356
357 pub fn table_type(&self) -> TableType {
358 self.table_type
359 }
360
361 pub fn engine(&self) -> Engine {
362 self.engine
363 }
364
365 pub fn iceberg_source_name(&self) -> Option<String> {
366 match self.engine {
367 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
368 Engine::Hummock => None,
369 }
370 }
371
372 pub fn iceberg_sink_name(&self) -> Option<String> {
373 match self.engine {
374 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
375 Engine::Hummock => None,
376 }
377 }
378
379 pub fn is_user_table(&self) -> bool {
380 self.table_type == TableType::Table
381 }
382
383 pub fn is_internal_table(&self) -> bool {
384 self.table_type == TableType::Internal
385 }
386
387 pub fn is_mview(&self) -> bool {
388 self.table_type == TableType::MaterializedView
389 }
390
391 pub fn is_index(&self) -> bool {
392 self.table_type == TableType::Index
393 }
394
395 #[must_use]
397 pub fn bad_drop_error(&self) -> RwError {
398 let msg = match self.table_type {
399 TableType::MaterializedView => {
400 "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
401 }
402 TableType::Index | TableType::VectorIndex => "Use `DROP INDEX` to drop an index.",
403 TableType::Table => "Use `DROP TABLE` to drop a table.",
404 TableType::Internal => "Internal tables cannot be dropped.",
405 };
406
407 ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
408 }
409
410 #[must_use]
412 pub fn associated_source_id(&self) -> Option<SourceId> {
413 self.associated_source_id
414 }
415
416 pub fn has_associated_source(&self) -> bool {
417 self.associated_source_id.is_some()
418 }
419
420 pub fn columns(&self) -> &[ColumnCatalog] {
422 &self.columns
423 }
424
425 pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
426 self.columns
427 .iter()
428 .filter(|c| !c.is_rw_timestamp_column())
429 .cloned()
430 .collect()
431 }
432
433 pub fn pk(&self) -> &[ColumnOrder] {
435 self.pk.as_ref()
436 }
437
438 pub fn pk_column_ids(&self) -> Vec<ColumnId> {
440 self.pk
441 .iter()
442 .map(|x| self.columns[x.column_index].column_id())
443 .collect()
444 }
445
446 pub fn pk_column_names(&self) -> Vec<&str> {
448 self.pk
449 .iter()
450 .map(|x| self.columns[x.column_index].name())
451 .collect()
452 }
453
454 pub fn stream_key(&self) -> Vec<usize> {
458 if self
460 .distribution_key
461 .iter()
462 .any(|dist_key| !self.stream_key.contains(dist_key))
463 {
464 let mut new_stream_key = self.distribution_key.clone();
465 let mut seen: HashSet<usize> = self.distribution_key.iter().copied().collect();
466 for &key in &self.stream_key {
467 if seen.insert(key) {
468 new_stream_key.push(key);
469 }
470 }
471 new_stream_key
472 } else {
473 self.stream_key.clone()
474 }
475 }
476
477 pub fn table_desc(&self) -> TableDesc {
482 use risingwave_common::catalog::TableOption;
483
484 let table_options = TableOption::new(self.retention_seconds);
485
486 TableDesc {
487 table_id: self.id,
488 pk: self.pk.clone(),
489 stream_key: self.stream_key(),
490 columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
491 distribution_key: self.distribution_key.clone(),
492 append_only: self.append_only,
493 retention_seconds: table_options.retention_seconds,
494 value_indices: self.value_indices.clone(),
495 read_prefix_len_hint: self.read_prefix_len_hint,
496 watermark_columns: self.watermark_columns.clone(),
497 versioned: self.version.is_some(),
498 vnode_col_index: self.vnode_col_index,
499 vnode_count: self.vnode_count(),
500 }
501 }
502
503 pub fn name(&self) -> &str {
505 self.name.as_ref()
506 }
507
508 pub fn distribution_key(&self) -> &[usize] {
509 self.distribution_key.as_ref()
510 }
511
512 pub fn to_internal_table_prost(&self) -> PbTable {
513 self.to_prost()
514 }
515
516 pub fn create_sql(&self) -> String {
520 self.create_sql_ast()
521 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
522 .unwrap_or_else(|_| self.definition.clone())
523 }
524
525 pub fn create_sql_ast(&self) -> Result<ast::Statement> {
533 if let TableType::Table = self.table_type()
534 && self.definition.is_empty()
535 {
536 self.create_sql_ast_purified()
538 } else {
539 self.create_sql_ast_from_persisted()
541 }
542 }
543
544 fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
545 Ok(Parser::parse_exactly_one(&self.definition)?)
546 }
547
548 pub fn version(&self) -> Option<&TableVersion> {
550 self.version.as_ref()
551 }
552
553 pub fn version_id(&self) -> Option<TableVersionId> {
555 self.version().map(|v| v.version_id)
556 }
557
558 pub fn vnode_count(&self) -> usize {
560 if self.id().is_placeholder() {
561 0
562 } else {
563 self.vnode_count.value()
565 }
566 }
567
568 pub fn to_prost(&self) -> PbTable {
569 PbTable {
570 id: self.id,
571 schema_id: self.schema_id,
572 database_id: self.database_id,
573 name: self.name.clone(),
574 columns: self
576 .columns_without_rw_timestamp()
577 .iter()
578 .map(|c| c.to_protobuf())
579 .collect(),
580 pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
581 stream_key: self.stream_key().iter().map(|x| *x as _).collect(),
582 optional_associated_source_id: self.associated_source_id.map(Into::into),
583 table_type: self.table_type.to_prost() as i32,
584 distribution_key: self
585 .distribution_key
586 .iter()
587 .map(|k| *k as i32)
588 .collect_vec(),
589 append_only: self.append_only,
590 owner: self.owner,
591 fragment_id: self.fragment_id,
592 dml_fragment_id: self.dml_fragment_id,
593 vnode_col_index: self.vnode_col_index.map(|i| i as _),
594 row_id_index: self.row_id_index.map(|i| i as _),
595 value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
596 definition: self.definition.clone(),
597 read_prefix_len_hint: self.read_prefix_len_hint as u32,
598 version: self.version.as_ref().map(TableVersion::to_prost),
599 watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
600 dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
601 handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
602 version_column_indices: self
603 .version_column_indices
604 .iter()
605 .map(|&idx| idx as u32)
606 .collect(),
607 cardinality: Some(self.cardinality.to_protobuf()),
608 initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
609 created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
610 cleaned_by_watermark: self.cleaned_by_watermark,
611 stream_job_status: self.stream_job_status.to_proto().into(),
612 create_type: self.create_type.to_proto().into(),
613 description: self.description.clone(),
614 #[expect(deprecated)]
615 incoming_sinks: vec![],
616 created_at_cluster_version: self.created_at_cluster_version.clone(),
617 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
618 retention_seconds: self.retention_seconds,
619 cdc_table_id: self.cdc_table_id.clone(),
620 maybe_vnode_count: self.vnode_count.to_protobuf(),
621 webhook_info: self.webhook_info.clone(),
622 job_id: self.job_id,
623 engine: Some(self.engine.to_protobuf().into()),
624 #[expect(deprecated)]
625 clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
626 clean_watermark_indices: self
627 .clean_watermark_indices
628 .iter()
629 .map(|&x| x as u32)
630 .collect(),
631 refreshable: self.refreshable,
632 vector_index_info: self.vector_index_info,
633 cdc_table_type: self
634 .cdc_table_type
635 .clone()
636 .map(|t| PbCdcTableType::from(t) as i32),
637 }
638 }
639
640 pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
642 self.columns
643 .iter()
644 .filter(|c| !c.is_hidden() && !c.is_generated())
645 }
646
647 pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
648 self.columns
649 .iter()
650 .filter(|c| c.is_generated())
651 .map(|c| c.name())
652 }
653
654 pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
655 self.columns
656 .iter()
657 .enumerate()
658 .filter(|(_, c)| c.is_generated())
659 .map(|(i, _)| i)
660 }
661
662 pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
663 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
664 .columns[col_idx]
665 .column_desc
666 .generated_or_default_column
667 .as_ref()
668 {
669 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
670 .expect("expr in default columns corrupted")
671 } else {
672 ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
673 }
674 }
675
676 pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
677 columns
678 .iter()
679 .map(|c| {
680 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
681 expr,
682 ..
683 })) = c.column_desc.generated_or_default_column.as_ref()
684 {
685 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
686 .expect("expr in default columns corrupted")
687 } else {
688 ExprImpl::literal_null(c.data_type().clone())
689 }
690 })
691 .collect()
692 }
693
694 pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
695 self.columns.iter().enumerate().filter_map(|(i, c)| {
696 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
697 expr, ..
698 })) = c.column_desc.generated_or_default_column.as_ref()
699 {
700 Some((
701 i,
702 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
703 .expect("expr in default columns corrupted"),
704 ))
705 } else {
706 None
707 }
708 })
709 }
710
711 pub fn has_generated_column(&self) -> bool {
712 self.columns.iter().any(|c| c.is_generated())
713 }
714
715 pub fn has_rw_timestamp_column(&self) -> bool {
716 self.columns.iter().any(|c| c.is_rw_timestamp_column())
717 }
718
719 pub fn column_schema(&self) -> Schema {
720 Schema::new(
721 self.columns
722 .iter()
723 .map(|c| Field::from(&c.column_desc))
724 .collect(),
725 )
726 }
727
728 pub fn is_created(&self) -> bool {
729 self.stream_job_status == StreamJobStatus::Created
730 }
731
732 pub fn is_iceberg_engine_table(&self) -> bool {
733 self.engine == Engine::Iceberg
734 }
735
736 pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
737 self.pk.iter().map(|col| col.column_index)
738 }
739
740 pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
741 ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
742 }
743
744 pub fn order_column_ids(&self) -> Vec<ColumnId> {
745 self.pk
746 .iter()
747 .map(|col| self.columns[col.column_index].column_desc.column_id)
748 .collect()
749 }
750
751 pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
752 self.pk.iter().map(|x| x.to_protobuf()).collect()
754 }
755}
756
757impl From<PbTable> for TableCatalog {
758 fn from(tb: PbTable) -> Self {
759 let id = tb.id;
760 let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
761 let tb_engine = tb
762 .get_engine()
763 .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
764 .unwrap_or(PbEngine::Hummock);
765 let table_type = tb.get_table_type().unwrap();
766 let stream_job_status = tb
767 .get_stream_job_status()
768 .unwrap_or(PbStreamJobStatus::Created);
769 let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
770 let associated_source_id = tb.optional_associated_source_id.map(Into::into);
771 let name = tb.name.clone();
772
773 let vnode_count = tb.vnode_count_inner();
774 if let VnodeCount::Placeholder = vnode_count {
775 assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
778 }
779
780 let mut col_names = HashSet::new();
781 let mut col_index: HashMap<i32, usize> = HashMap::new();
782
783 let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
784 let version_column_indices: Vec<usize> = tb
785 .version_column_indices
786 .iter()
787 .map(|&idx| idx as usize)
788 .collect();
789 let mut columns: Vec<ColumnCatalog> =
790 tb.columns.into_iter().map(ColumnCatalog::from).collect();
791 if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
792 columns.push(ColumnCatalog::rw_timestamp_column());
794 }
795 for (idx, catalog) in columns.clone().into_iter().enumerate() {
796 let col_name = catalog.name();
797 if !col_names.insert(col_name.to_owned()) {
798 panic!("duplicated column name {} in table {} ", col_name, tb.name)
799 }
800
801 let col_id = catalog.column_desc.column_id.get_id();
802 col_index.insert(col_id, idx);
803 }
804
805 let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
806 let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
807 for idx in &tb.watermark_indices {
808 watermark_columns.insert(*idx as _);
809 }
810 let engine = Engine::from_protobuf(&tb_engine);
811
812 Self {
813 id,
814 schema_id: tb.schema_id,
815 database_id: tb.database_id,
816 associated_source_id,
817 name,
818 pk,
819 columns,
820 table_type: TableType::from_prost(table_type),
821 distribution_key: tb
822 .distribution_key
823 .iter()
824 .map(|k| *k as usize)
825 .collect_vec(),
826 stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
827 append_only: tb.append_only,
828 owner: tb.owner,
829 fragment_id: tb.fragment_id,
830 dml_fragment_id: tb.dml_fragment_id,
831 vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
832 row_id_index: tb.row_id_index.map(|x| x as usize),
833 value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
834 definition: tb.definition,
835 conflict_behavior,
836 version_column_indices,
837 read_prefix_len_hint: tb.read_prefix_len_hint as usize,
838 version: tb.version.map(TableVersion::from_prost),
839 watermark_columns,
840 dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
841 cardinality: tb
842 .cardinality
843 .map(|c| Cardinality::from_protobuf(&c))
844 .unwrap_or_else(Cardinality::unknown),
845 created_at_epoch: tb.created_at_epoch.map(Epoch::from),
846 initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
847 cleaned_by_watermark: tb.cleaned_by_watermark,
848 create_type: CreateType::from_proto(create_type),
849 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
850 description: tb.description,
851 created_at_cluster_version: tb.created_at_cluster_version.clone(),
852 initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
853 retention_seconds: tb.retention_seconds,
854 cdc_table_id: tb.cdc_table_id,
855 vnode_count,
856 webhook_info: tb.webhook_info,
857 job_id: tb.job_id,
858 engine,
859 #[expect(deprecated)]
860 clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
861 clean_watermark_indices: tb
862 .clean_watermark_indices
863 .iter()
864 .map(|&x| x as usize)
865 .collect(),
866
867 refreshable: tb.refreshable,
868 vector_index_info: tb.vector_index_info,
869 cdc_table_type: tb
870 .cdc_table_type
871 .and_then(|t| PbCdcTableType::try_from(t).ok())
872 .map(ExternalCdcTableType::from),
873 }
874 }
875}
876
877impl From<&PbTable> for TableCatalog {
878 fn from(tb: &PbTable) -> Self {
879 tb.clone().into()
880 }
881}
882
883impl OwnedByUserCatalog for TableCatalog {
884 fn owner(&self) -> UserId {
885 self.owner
886 }
887}
888
889#[cfg(test)]
890mod tests {
891 use risingwave_common::catalog::{ColumnDesc, ColumnId};
892 use risingwave_common::test_prelude::*;
893 use risingwave_common::types::*;
894 use risingwave_common::util::sort_util::OrderType;
895 use risingwave_pb::catalog::table::PbEngine;
896 use risingwave_pb::plan_common::{
897 AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
898 };
899
900 use super::*;
901
902 #[test]
903 fn test_into_table_catalog() {
904 let table: TableCatalog = PbTable {
905 id: 0.into(),
906 schema_id: 0.into(),
907 database_id: 0.into(),
908 name: "test".to_owned(),
909 table_type: PbTableType::Table as i32,
910 columns: vec![
911 ColumnCatalog::row_id_column().to_protobuf(),
912 PbColumnCatalog {
913 column_desc: Some(PbColumnDesc::new(
914 DataType::from(StructType::new([
915 ("address", DataType::Varchar),
916 ("zipcode", DataType::Varchar),
917 ]))
918 .to_protobuf(),
919 "country",
920 1,
921 )),
922 is_hidden: false,
923 },
924 ],
925 pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
926 stream_key: vec![0],
927 distribution_key: vec![0],
928 optional_associated_source_id: Some(SourceId::new(233).into()),
929 append_only: false,
930 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
931 retention_seconds: Some(300),
932 fragment_id: 0.into(),
933 dml_fragment_id: None,
934 initialized_at_epoch: None,
935 value_indices: vec![0],
936 definition: "".into(),
937 read_prefix_len_hint: 0,
938 vnode_col_index: None,
939 row_id_index: None,
940 version: Some(PbTableVersion {
941 version: 0,
942 next_column_id: 2,
943 }),
944 watermark_indices: vec![],
945 handle_pk_conflict_behavior: 3,
946 dist_key_in_pk: vec![0],
947 cardinality: None,
948 created_at_epoch: None,
949 cleaned_by_watermark: false,
950 stream_job_status: PbStreamJobStatus::Created.into(),
951 create_type: PbCreateType::Foreground.into(),
952 description: Some("description".to_owned()),
953 #[expect(deprecated)]
954 incoming_sinks: vec![],
955 created_at_cluster_version: None,
956 initialized_at_cluster_version: None,
957 version_column_indices: Vec::new(),
958 cdc_table_id: None,
959 maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
960 webhook_info: None,
961 job_id: None,
962 engine: Some(PbEngine::Hummock as i32),
963 #[expect(deprecated)]
964 clean_watermark_index_in_pk: None,
965 clean_watermark_indices: vec![],
966
967 refreshable: false,
968 vector_index_info: None,
969 cdc_table_type: None,
970 }
971 .into();
972
973 assert_eq!(
974 table,
975 TableCatalog {
976 id: TableId::new(0),
977 schema_id: 0.into(),
978 database_id: 0.into(),
979 associated_source_id: Some(SourceId::new(233)),
980 name: "test".to_owned(),
981 table_type: TableType::Table,
982 columns: vec![
983 ColumnCatalog::row_id_column(),
984 ColumnCatalog {
985 column_desc: ColumnDesc {
986 data_type: StructType::new(vec![
987 ("address", DataType::Varchar),
988 ("zipcode", DataType::Varchar)
989 ],)
990 .into(),
991 column_id: ColumnId::new(1),
992 name: "country".to_owned(),
993 description: None,
994 generated_or_default_column: None,
995 additional_column: AdditionalColumn { column_type: None },
996 version: ColumnDescVersion::LATEST,
997 system_column: None,
998 nullable: true,
999 },
1000 is_hidden: false
1001 },
1002 ColumnCatalog::rw_timestamp_column(),
1003 ],
1004 stream_key: vec![0],
1005 pk: vec![ColumnOrder::new(0, OrderType::ascending())],
1006 distribution_key: vec![0],
1007 append_only: false,
1008 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
1009 retention_seconds: Some(300),
1010 fragment_id: 0.into(),
1011 dml_fragment_id: None,
1012 vnode_col_index: None,
1013 row_id_index: None,
1014 value_indices: vec![0],
1015 definition: "".into(),
1016 conflict_behavior: ConflictBehavior::NoCheck,
1017 read_prefix_len_hint: 0,
1018 version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
1019 watermark_columns: FixedBitSet::with_capacity(3),
1020 dist_key_in_pk: vec![0],
1021 cardinality: Cardinality::unknown(),
1022 created_at_epoch: None,
1023 initialized_at_epoch: None,
1024 cleaned_by_watermark: false,
1025 stream_job_status: StreamJobStatus::Created,
1026 create_type: CreateType::Foreground,
1027 description: Some("description".to_owned()),
1028 created_at_cluster_version: None,
1029 initialized_at_cluster_version: None,
1030 version_column_indices: Vec::new(),
1031 cdc_table_id: None,
1032 vnode_count: VnodeCount::set(233),
1033 webhook_info: None,
1034 job_id: None,
1035 engine: Engine::Hummock,
1036 clean_watermark_index_in_pk: None,
1037 clean_watermark_indices: vec![],
1038
1039 refreshable: false,
1040 vector_index_info: None,
1041 cdc_table_type: None,
1042 }
1043 );
1044 assert_eq!(table, TableCatalog::from(table.to_prost()));
1045 }
1046}