1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21 ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, ICEBERG_SINK_PREFIX,
22 ICEBERG_SOURCE_PREFIX, Schema, StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::id::{JobId, SourceId};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_connector::source::cdc::external::ExternalCdcTableType;
29use risingwave_pb::catalog::table::{
30 CdcTableType as PbCdcTableType, PbEngine, PbTableType, PbTableVersion,
31};
32use risingwave_pb::catalog::{
33 PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
34};
35use risingwave_pb::common::PbColumnOrder;
36use risingwave_pb::plan_common::DefaultColumnDesc;
37use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
38use risingwave_sqlparser::ast;
39use risingwave_sqlparser::parser::Parser;
40use thiserror_ext::AsReport as _;
41
42use super::purify::try_purify_table_source_create_sql_ast;
43use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId};
44use crate::error::{ErrorCode, Result, RwError};
45use crate::expr::ExprImpl;
46use crate::optimizer::property::Cardinality;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50#[derive(Clone, Debug, PartialEq, Eq, Hash)]
82#[cfg_attr(test, derive(Default))]
83pub struct TableCatalog {
84 pub id: TableId,
85
86 pub schema_id: SchemaId,
87
88 pub database_id: DatabaseId,
89
90 pub associated_source_id: Option<SourceId>, pub name: String,
93
94 pub columns: Vec<ColumnCatalog>,
96
97 pub pk: Vec<ColumnOrder>,
99
100 pub stream_key: Vec<usize>,
104
105 pub table_type: TableType,
108
109 pub distribution_key: Vec<usize>,
111
112 pub append_only: bool,
115
116 pub cardinality: Cardinality,
118
119 pub owner: UserId,
121
122 pub retention_seconds: Option<u32>,
124
125 pub fragment_id: FragmentId,
127
128 pub dml_fragment_id: Option<FragmentId>,
130
131 pub vnode_col_index: Option<usize>,
134
135 pub row_id_index: Option<usize>,
138
139 pub value_indices: Vec<usize>,
141
142 pub definition: String,
144
145 pub conflict_behavior: ConflictBehavior,
149
150 pub version_column_indices: Vec<usize>,
151
152 pub read_prefix_len_hint: usize,
153
154 pub version: Option<TableVersion>,
156
157 pub watermark_columns: FixedBitSet,
159
160 pub dist_key_in_pk: Vec<usize>,
163
164 pub created_at_epoch: Option<Epoch>,
165
166 pub initialized_at_epoch: Option<Epoch>,
167
168 pub cleaned_by_watermark: bool,
170
171 pub create_type: CreateType,
173
174 pub stream_job_status: StreamJobStatus,
177
178 pub description: Option<String>,
180
181 pub created_at_cluster_version: Option<String>,
182
183 pub initialized_at_cluster_version: Option<String>,
184
185 pub cdc_table_id: Option<String>,
186
187 pub vnode_count: VnodeCount,
196
197 pub webhook_info: Option<PbWebhookSourceInfo>,
198
199 pub job_id: Option<JobId>,
200
201 pub engine: Engine,
202
203 pub clean_watermark_index_in_pk: Option<usize>,
204
205 pub refreshable: bool,
207
208 pub vector_index_info: Option<PbVectorIndexInfo>,
209
210 pub cdc_table_type: Option<ExternalCdcTableType>,
211}
212
213#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
214#[cfg_attr(test, derive(Default))]
215pub enum TableType {
216 #[cfg_attr(test, default)]
218 Table,
219 MaterializedView,
221 Index,
224 VectorIndex,
225 Internal,
227}
228
229impl TableType {
230 fn from_prost(prost: PbTableType) -> Self {
231 match prost {
232 PbTableType::Table => Self::Table,
233 PbTableType::MaterializedView => Self::MaterializedView,
234 PbTableType::Index => Self::Index,
235 PbTableType::Internal => Self::Internal,
236 PbTableType::VectorIndex => Self::VectorIndex,
237 PbTableType::Unspecified => unreachable!(),
238 }
239 }
240
241 pub(crate) fn to_prost(self) -> PbTableType {
242 match self {
243 Self::Table => PbTableType::Table,
244 Self::MaterializedView => PbTableType::MaterializedView,
245 Self::Index => PbTableType::Index,
246 Self::VectorIndex => PbTableType::VectorIndex,
247 Self::Internal => PbTableType::Internal,
248 }
249 }
250}
251
252#[derive(Clone, Debug, PartialEq, Eq, Hash)]
254pub struct TableVersion {
255 pub version_id: TableVersionId,
256 pub next_column_id: ColumnId,
257}
258
259impl TableVersion {
260 #[cfg(test)]
262 pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
263 use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
264
265 Self {
266 version_id: INITIAL_TABLE_VERSION_ID,
267 next_column_id: max_column_id.next(),
268 }
269 }
270
271 pub fn from_prost(prost: PbTableVersion) -> Self {
272 Self {
273 version_id: prost.version,
274 next_column_id: ColumnId::from(prost.next_column_id),
275 }
276 }
277
278 pub fn to_prost(&self) -> PbTableVersion {
279 PbTableVersion {
280 version: self.version_id,
281 next_column_id: self.next_column_id.into(),
282 }
283 }
284}
285
286impl TableCatalog {
287 pub fn create_sql_purified(&self) -> String {
292 self.create_sql_ast_purified()
293 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
294 .unwrap_or_else(|_| self.create_sql())
295 }
296
297 pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
302 if let TableType::Table = self.table_type() {
304 let base = if self.definition.is_empty() {
305 let name = ast::ObjectName(vec![self.name.as_str().into()]);
307 ast::Statement::default_create_table(name)
308 } else {
309 self.create_sql_ast_from_persisted()?
310 };
311
312 match try_purify_table_source_create_sql_ast(
313 base,
314 self.columns(),
315 self.row_id_index,
316 &self.pk_column_ids(),
317 ) {
318 Ok(stmt) => return Ok(stmt),
319 Err(e) => notice_to_user(format!(
320 "error occurred while purifying definition for table \"{}\", \
321 results may be inaccurate: {}",
322 self.name,
323 e.as_report()
324 )),
325 }
326 }
327
328 self.create_sql_ast_from_persisted()
329 }
330}
331
332impl TableCatalog {
333 pub fn id(&self) -> TableId {
335 self.id
336 }
337
338 pub fn with_id(mut self, id: TableId) -> Self {
339 self.id = id;
340 self
341 }
342
343 pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
344 self.cleaned_by_watermark = cleaned_by_watermark;
345 self
346 }
347
348 pub fn conflict_behavior(&self) -> ConflictBehavior {
349 self.conflict_behavior
350 }
351
352 pub fn table_type(&self) -> TableType {
353 self.table_type
354 }
355
356 pub fn engine(&self) -> Engine {
357 self.engine
358 }
359
360 pub fn iceberg_source_name(&self) -> Option<String> {
361 match self.engine {
362 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
363 Engine::Hummock => None,
364 }
365 }
366
367 pub fn iceberg_sink_name(&self) -> Option<String> {
368 match self.engine {
369 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
370 Engine::Hummock => None,
371 }
372 }
373
374 pub fn is_user_table(&self) -> bool {
375 self.table_type == TableType::Table
376 }
377
378 pub fn is_internal_table(&self) -> bool {
379 self.table_type == TableType::Internal
380 }
381
382 pub fn is_mview(&self) -> bool {
383 self.table_type == TableType::MaterializedView
384 }
385
386 pub fn is_index(&self) -> bool {
387 self.table_type == TableType::Index
388 }
389
390 #[must_use]
392 pub fn bad_drop_error(&self) -> RwError {
393 let msg = match self.table_type {
394 TableType::MaterializedView => {
395 "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
396 }
397 TableType::Index | TableType::VectorIndex => "Use `DROP INDEX` to drop an index.",
398 TableType::Table => "Use `DROP TABLE` to drop a table.",
399 TableType::Internal => "Internal tables cannot be dropped.",
400 };
401
402 ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
403 }
404
405 #[must_use]
407 pub fn associated_source_id(&self) -> Option<SourceId> {
408 self.associated_source_id
409 }
410
411 pub fn has_associated_source(&self) -> bool {
412 self.associated_source_id.is_some()
413 }
414
415 pub fn columns(&self) -> &[ColumnCatalog] {
417 &self.columns
418 }
419
420 pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
421 self.columns
422 .iter()
423 .filter(|c| !c.is_rw_timestamp_column())
424 .cloned()
425 .collect()
426 }
427
428 pub fn pk(&self) -> &[ColumnOrder] {
430 self.pk.as_ref()
431 }
432
433 pub fn pk_column_ids(&self) -> Vec<ColumnId> {
435 self.pk
436 .iter()
437 .map(|x| self.columns[x.column_index].column_id())
438 .collect()
439 }
440
441 pub fn pk_column_names(&self) -> Vec<&str> {
443 self.pk
444 .iter()
445 .map(|x| self.columns[x.column_index].name())
446 .collect()
447 }
448
449 pub fn stream_key(&self) -> Vec<usize> {
453 if self
455 .distribution_key
456 .iter()
457 .any(|dist_key| !self.stream_key.contains(dist_key))
458 {
459 let mut new_stream_key = self.distribution_key.clone();
460 let mut seen: HashSet<usize> = self.distribution_key.iter().copied().collect();
461 for &key in &self.stream_key {
462 if seen.insert(key) {
463 new_stream_key.push(key);
464 }
465 }
466 new_stream_key
467 } else {
468 self.stream_key.clone()
469 }
470 }
471
472 pub fn table_desc(&self) -> TableDesc {
477 use risingwave_common::catalog::TableOption;
478
479 let table_options = TableOption::new(self.retention_seconds);
480
481 TableDesc {
482 table_id: self.id,
483 pk: self.pk.clone(),
484 stream_key: self.stream_key(),
485 columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
486 distribution_key: self.distribution_key.clone(),
487 append_only: self.append_only,
488 retention_seconds: table_options.retention_seconds,
489 value_indices: self.value_indices.clone(),
490 read_prefix_len_hint: self.read_prefix_len_hint,
491 watermark_columns: self.watermark_columns.clone(),
492 versioned: self.version.is_some(),
493 vnode_col_index: self.vnode_col_index,
494 vnode_count: self.vnode_count(),
495 }
496 }
497
498 pub fn name(&self) -> &str {
500 self.name.as_ref()
501 }
502
503 pub fn distribution_key(&self) -> &[usize] {
504 self.distribution_key.as_ref()
505 }
506
507 pub fn to_internal_table_prost(&self) -> PbTable {
508 self.to_prost()
509 }
510
511 pub fn create_sql(&self) -> String {
515 self.create_sql_ast()
516 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
517 .unwrap_or_else(|_| self.definition.clone())
518 }
519
520 pub fn create_sql_ast(&self) -> Result<ast::Statement> {
528 if let TableType::Table = self.table_type()
529 && self.definition.is_empty()
530 {
531 self.create_sql_ast_purified()
533 } else {
534 self.create_sql_ast_from_persisted()
536 }
537 }
538
539 fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
540 Ok(Parser::parse_exactly_one(&self.definition)?)
541 }
542
543 pub fn version(&self) -> Option<&TableVersion> {
545 self.version.as_ref()
546 }
547
548 pub fn version_id(&self) -> Option<TableVersionId> {
550 self.version().map(|v| v.version_id)
551 }
552
553 pub fn vnode_count(&self) -> usize {
555 if self.id().is_placeholder() {
556 0
557 } else {
558 self.vnode_count.value()
560 }
561 }
562
563 pub fn to_prost(&self) -> PbTable {
564 PbTable {
565 id: self.id,
566 schema_id: self.schema_id,
567 database_id: self.database_id,
568 name: self.name.clone(),
569 columns: self
571 .columns_without_rw_timestamp()
572 .iter()
573 .map(|c| c.to_protobuf())
574 .collect(),
575 pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
576 stream_key: self.stream_key().iter().map(|x| *x as _).collect(),
577 optional_associated_source_id: self.associated_source_id.map(Into::into),
578 table_type: self.table_type.to_prost() as i32,
579 distribution_key: self
580 .distribution_key
581 .iter()
582 .map(|k| *k as i32)
583 .collect_vec(),
584 append_only: self.append_only,
585 owner: self.owner,
586 fragment_id: self.fragment_id,
587 dml_fragment_id: self.dml_fragment_id,
588 vnode_col_index: self.vnode_col_index.map(|i| i as _),
589 row_id_index: self.row_id_index.map(|i| i as _),
590 value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
591 definition: self.definition.clone(),
592 read_prefix_len_hint: self.read_prefix_len_hint as u32,
593 version: self.version.as_ref().map(TableVersion::to_prost),
594 watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
595 dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
596 handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
597 version_column_indices: self
598 .version_column_indices
599 .iter()
600 .map(|&idx| idx as u32)
601 .collect(),
602 cardinality: Some(self.cardinality.to_protobuf()),
603 initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
604 created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
605 cleaned_by_watermark: self.cleaned_by_watermark,
606 stream_job_status: self.stream_job_status.to_proto().into(),
607 create_type: self.create_type.to_proto().into(),
608 description: self.description.clone(),
609 #[expect(deprecated)]
610 incoming_sinks: vec![],
611 created_at_cluster_version: self.created_at_cluster_version.clone(),
612 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
613 retention_seconds: self.retention_seconds,
614 cdc_table_id: self.cdc_table_id.clone(),
615 maybe_vnode_count: self.vnode_count.to_protobuf(),
616 webhook_info: self.webhook_info.clone(),
617 job_id: self.job_id,
618 engine: Some(self.engine.to_protobuf().into()),
619 clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
620 refreshable: self.refreshable,
621 vector_index_info: self.vector_index_info,
622 cdc_table_type: self
623 .cdc_table_type
624 .clone()
625 .map(|t| PbCdcTableType::from(t) as i32),
626 refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
627 }
628 }
629
630 pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
632 self.columns
633 .iter()
634 .filter(|c| !c.is_hidden() && !c.is_generated())
635 }
636
637 pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
638 self.columns
639 .iter()
640 .filter(|c| c.is_generated())
641 .map(|c| c.name())
642 }
643
644 pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
645 self.columns
646 .iter()
647 .enumerate()
648 .filter(|(_, c)| c.is_generated())
649 .map(|(i, _)| i)
650 }
651
652 pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
653 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
654 .columns[col_idx]
655 .column_desc
656 .generated_or_default_column
657 .as_ref()
658 {
659 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
660 .expect("expr in default columns corrupted")
661 } else {
662 ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
663 }
664 }
665
666 pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
667 columns
668 .iter()
669 .map(|c| {
670 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
671 expr,
672 ..
673 })) = c.column_desc.generated_or_default_column.as_ref()
674 {
675 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
676 .expect("expr in default columns corrupted")
677 } else {
678 ExprImpl::literal_null(c.data_type().clone())
679 }
680 })
681 .collect()
682 }
683
684 pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
685 self.columns.iter().enumerate().filter_map(|(i, c)| {
686 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
687 expr, ..
688 })) = c.column_desc.generated_or_default_column.as_ref()
689 {
690 Some((
691 i,
692 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
693 .expect("expr in default columns corrupted"),
694 ))
695 } else {
696 None
697 }
698 })
699 }
700
701 pub fn has_generated_column(&self) -> bool {
702 self.columns.iter().any(|c| c.is_generated())
703 }
704
705 pub fn has_rw_timestamp_column(&self) -> bool {
706 self.columns.iter().any(|c| c.is_rw_timestamp_column())
707 }
708
709 pub fn column_schema(&self) -> Schema {
710 Schema::new(
711 self.columns
712 .iter()
713 .map(|c| Field::from(&c.column_desc))
714 .collect(),
715 )
716 }
717
718 pub fn is_created(&self) -> bool {
719 self.stream_job_status == StreamJobStatus::Created
720 }
721
722 pub fn is_iceberg_engine_table(&self) -> bool {
723 self.engine == Engine::Iceberg
724 }
725
726 pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
727 self.pk.iter().map(|col| col.column_index)
728 }
729
730 pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
731 ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
732 }
733
734 pub fn order_column_ids(&self) -> Vec<ColumnId> {
735 self.pk
736 .iter()
737 .map(|col| self.columns[col.column_index].column_desc.column_id)
738 .collect()
739 }
740
741 pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
742 self.pk.iter().map(|x| x.to_protobuf()).collect()
744 }
745}
746
747impl From<PbTable> for TableCatalog {
748 fn from(tb: PbTable) -> Self {
749 let id = tb.id;
750 let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
751 let tb_engine = tb
752 .get_engine()
753 .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
754 .unwrap_or(PbEngine::Hummock);
755 let table_type = tb.get_table_type().unwrap();
756 let stream_job_status = tb
757 .get_stream_job_status()
758 .unwrap_or(PbStreamJobStatus::Created);
759 let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
760 let associated_source_id = tb.optional_associated_source_id.map(Into::into);
761 let name = tb.name.clone();
762
763 let vnode_count = tb.vnode_count_inner();
764 if let VnodeCount::Placeholder = vnode_count {
765 assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
768 }
769
770 let mut col_names = HashSet::new();
771 let mut col_index: HashMap<i32, usize> = HashMap::new();
772
773 let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
774 let version_column_indices: Vec<usize> = tb
775 .version_column_indices
776 .iter()
777 .map(|&idx| idx as usize)
778 .collect();
779 let mut columns: Vec<ColumnCatalog> =
780 tb.columns.into_iter().map(ColumnCatalog::from).collect();
781 if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
782 columns.push(ColumnCatalog::rw_timestamp_column());
784 }
785 for (idx, catalog) in columns.clone().into_iter().enumerate() {
786 let col_name = catalog.name();
787 if !col_names.insert(col_name.to_owned()) {
788 panic!("duplicated column name {} in table {} ", col_name, tb.name)
789 }
790
791 let col_id = catalog.column_desc.column_id.get_id();
792 col_index.insert(col_id, idx);
793 }
794
795 let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
796 let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
797 for idx in &tb.watermark_indices {
798 watermark_columns.insert(*idx as _);
799 }
800 let engine = Engine::from_protobuf(&tb_engine);
801
802 Self {
803 id,
804 schema_id: tb.schema_id,
805 database_id: tb.database_id,
806 associated_source_id,
807 name,
808 pk,
809 columns,
810 table_type: TableType::from_prost(table_type),
811 distribution_key: tb
812 .distribution_key
813 .iter()
814 .map(|k| *k as usize)
815 .collect_vec(),
816 stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
817 append_only: tb.append_only,
818 owner: tb.owner,
819 fragment_id: tb.fragment_id,
820 dml_fragment_id: tb.dml_fragment_id,
821 vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
822 row_id_index: tb.row_id_index.map(|x| x as usize),
823 value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
824 definition: tb.definition,
825 conflict_behavior,
826 version_column_indices,
827 read_prefix_len_hint: tb.read_prefix_len_hint as usize,
828 version: tb.version.map(TableVersion::from_prost),
829 watermark_columns,
830 dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
831 cardinality: tb
832 .cardinality
833 .map(|c| Cardinality::from_protobuf(&c))
834 .unwrap_or_else(Cardinality::unknown),
835 created_at_epoch: tb.created_at_epoch.map(Epoch::from),
836 initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
837 cleaned_by_watermark: tb.cleaned_by_watermark,
838 create_type: CreateType::from_proto(create_type),
839 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
840 description: tb.description,
841 created_at_cluster_version: tb.created_at_cluster_version.clone(),
842 initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
843 retention_seconds: tb.retention_seconds,
844 cdc_table_id: tb.cdc_table_id,
845 vnode_count,
846 webhook_info: tb.webhook_info,
847 job_id: tb.job_id,
848 engine,
849 clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
850
851 refreshable: tb.refreshable,
852 vector_index_info: tb.vector_index_info,
853 cdc_table_type: tb
854 .cdc_table_type
855 .and_then(|t| PbCdcTableType::try_from(t).ok())
856 .map(ExternalCdcTableType::from),
857 }
858 }
859}
860
861impl From<&PbTable> for TableCatalog {
862 fn from(tb: &PbTable) -> Self {
863 tb.clone().into()
864 }
865}
866
867impl OwnedByUserCatalog for TableCatalog {
868 fn owner(&self) -> UserId {
869 self.owner
870 }
871}
872
873#[cfg(test)]
874mod tests {
875 use risingwave_common::catalog::{ColumnDesc, ColumnId};
876 use risingwave_common::test_prelude::*;
877 use risingwave_common::types::*;
878 use risingwave_common::util::sort_util::OrderType;
879 use risingwave_pb::catalog::table::PbEngine;
880 use risingwave_pb::plan_common::{
881 AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
882 };
883
884 use super::*;
885
886 #[test]
887 fn test_into_table_catalog() {
888 let table: TableCatalog = PbTable {
889 id: 0.into(),
890 schema_id: 0.into(),
891 database_id: 0.into(),
892 name: "test".to_owned(),
893 table_type: PbTableType::Table as i32,
894 columns: vec![
895 ColumnCatalog::row_id_column().to_protobuf(),
896 PbColumnCatalog {
897 column_desc: Some(PbColumnDesc::new(
898 DataType::from(StructType::new([
899 ("address", DataType::Varchar),
900 ("zipcode", DataType::Varchar),
901 ]))
902 .to_protobuf(),
903 "country",
904 1,
905 )),
906 is_hidden: false,
907 },
908 ],
909 pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
910 stream_key: vec![0],
911 distribution_key: vec![0],
912 optional_associated_source_id: Some(SourceId::new(233).into()),
913 append_only: false,
914 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
915 retention_seconds: Some(300),
916 fragment_id: 0.into(),
917 dml_fragment_id: None,
918 initialized_at_epoch: None,
919 value_indices: vec![0],
920 definition: "".into(),
921 read_prefix_len_hint: 0,
922 vnode_col_index: None,
923 row_id_index: None,
924 version: Some(PbTableVersion {
925 version: 0,
926 next_column_id: 2,
927 }),
928 watermark_indices: vec![],
929 handle_pk_conflict_behavior: 3,
930 dist_key_in_pk: vec![0],
931 cardinality: None,
932 created_at_epoch: None,
933 cleaned_by_watermark: false,
934 stream_job_status: PbStreamJobStatus::Created.into(),
935 create_type: PbCreateType::Foreground.into(),
936 description: Some("description".to_owned()),
937 #[expect(deprecated)]
938 incoming_sinks: vec![],
939 created_at_cluster_version: None,
940 initialized_at_cluster_version: None,
941 version_column_indices: Vec::new(),
942 cdc_table_id: None,
943 maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
944 webhook_info: None,
945 job_id: None,
946 engine: Some(PbEngine::Hummock as i32),
947 clean_watermark_index_in_pk: None,
948
949 refreshable: false,
950 vector_index_info: None,
951 cdc_table_type: None,
952 refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
953 }
954 .into();
955
956 assert_eq!(
957 table,
958 TableCatalog {
959 id: TableId::new(0),
960 schema_id: 0.into(),
961 database_id: 0.into(),
962 associated_source_id: Some(SourceId::new(233)),
963 name: "test".to_owned(),
964 table_type: TableType::Table,
965 columns: vec![
966 ColumnCatalog::row_id_column(),
967 ColumnCatalog {
968 column_desc: ColumnDesc {
969 data_type: StructType::new(vec![
970 ("address", DataType::Varchar),
971 ("zipcode", DataType::Varchar)
972 ],)
973 .into(),
974 column_id: ColumnId::new(1),
975 name: "country".to_owned(),
976 description: None,
977 generated_or_default_column: None,
978 additional_column: AdditionalColumn { column_type: None },
979 version: ColumnDescVersion::LATEST,
980 system_column: None,
981 nullable: true,
982 },
983 is_hidden: false
984 },
985 ColumnCatalog::rw_timestamp_column(),
986 ],
987 stream_key: vec![0],
988 pk: vec![ColumnOrder::new(0, OrderType::ascending())],
989 distribution_key: vec![0],
990 append_only: false,
991 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
992 retention_seconds: Some(300),
993 fragment_id: 0.into(),
994 dml_fragment_id: None,
995 vnode_col_index: None,
996 row_id_index: None,
997 value_indices: vec![0],
998 definition: "".into(),
999 conflict_behavior: ConflictBehavior::NoCheck,
1000 read_prefix_len_hint: 0,
1001 version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
1002 watermark_columns: FixedBitSet::with_capacity(3),
1003 dist_key_in_pk: vec![0],
1004 cardinality: Cardinality::unknown(),
1005 created_at_epoch: None,
1006 initialized_at_epoch: None,
1007 cleaned_by_watermark: false,
1008 stream_job_status: StreamJobStatus::Created,
1009 create_type: CreateType::Foreground,
1010 description: Some("description".to_owned()),
1011 created_at_cluster_version: None,
1012 initialized_at_cluster_version: None,
1013 version_column_indices: Vec::new(),
1014 cdc_table_id: None,
1015 vnode_count: VnodeCount::set(233),
1016 webhook_info: None,
1017 job_id: None,
1018 engine: Engine::Hummock,
1019 clean_watermark_index_in_pk: None,
1020
1021 refreshable: false,
1022 vector_index_info: None,
1023 cdc_table_type: None,
1024 }
1025 );
1026 assert_eq!(table, TableCatalog::from(table.to_prost()));
1027 }
1028}