1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21 ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, Schema,
22 StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_connector::source::cdc::external::ExternalCdcTableType;
28use risingwave_pb::catalog::table::{
29 CdcTableType as PbCdcTableType, OptionalAssociatedSourceId, PbEngine, PbTableType,
30 PbTableVersion,
31};
32use risingwave_pb::catalog::{
33 PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
34};
35use risingwave_pb::common::PbColumnOrder;
36use risingwave_pb::id::JobId;
37use risingwave_pb::plan_common::DefaultColumnDesc;
38use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
39use risingwave_sqlparser::ast;
40use risingwave_sqlparser::parser::Parser;
41use thiserror_ext::AsReport as _;
42
43use super::purify::try_purify_table_source_create_sql_ast;
44use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId};
45use crate::error::{ErrorCode, Result, RwError};
46use crate::expr::ExprImpl;
47use crate::optimizer::property::Cardinality;
48use crate::session::current::notice_to_user;
49use crate::user::UserId;
50
51#[derive(Clone, Debug, PartialEq, Eq, Hash)]
83#[cfg_attr(test, derive(Default))]
84pub struct TableCatalog {
85 pub id: TableId,
86
87 pub schema_id: SchemaId,
88
89 pub database_id: DatabaseId,
90
91 pub associated_source_id: Option<TableId>, pub name: String,
94
95 pub columns: Vec<ColumnCatalog>,
97
98 pub pk: Vec<ColumnOrder>,
100
101 pub stream_key: Vec<usize>,
105
106 pub table_type: TableType,
109
110 pub distribution_key: Vec<usize>,
112
113 pub append_only: bool,
116
117 pub cardinality: Cardinality,
119
120 pub owner: UserId,
122
123 pub retention_seconds: Option<u32>,
125
126 pub fragment_id: FragmentId,
128
129 pub dml_fragment_id: Option<FragmentId>,
131
132 pub vnode_col_index: Option<usize>,
135
136 pub row_id_index: Option<usize>,
139
140 pub value_indices: Vec<usize>,
142
143 pub definition: String,
145
146 pub conflict_behavior: ConflictBehavior,
150
151 pub version_column_indices: Vec<usize>,
152
153 pub read_prefix_len_hint: usize,
154
155 pub version: Option<TableVersion>,
157
158 pub watermark_columns: FixedBitSet,
160
161 pub dist_key_in_pk: Vec<usize>,
164
165 pub created_at_epoch: Option<Epoch>,
166
167 pub initialized_at_epoch: Option<Epoch>,
168
169 pub cleaned_by_watermark: bool,
171
172 pub create_type: CreateType,
174
175 pub stream_job_status: StreamJobStatus,
178
179 pub description: Option<String>,
181
182 pub created_at_cluster_version: Option<String>,
183
184 pub initialized_at_cluster_version: Option<String>,
185
186 pub cdc_table_id: Option<String>,
187
188 pub vnode_count: VnodeCount,
197
198 pub webhook_info: Option<PbWebhookSourceInfo>,
199
200 pub job_id: Option<JobId>,
201
202 pub engine: Engine,
203
204 pub clean_watermark_index_in_pk: Option<usize>,
205
206 pub refreshable: bool,
208
209 pub vector_index_info: Option<PbVectorIndexInfo>,
210
211 pub cdc_table_type: Option<ExternalCdcTableType>,
212}
213
214pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
215pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
216
217#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
218#[cfg_attr(test, derive(Default))]
219pub enum TableType {
220 #[cfg_attr(test, default)]
222 Table,
223 MaterializedView,
225 Index,
228 VectorIndex,
229 Internal,
231}
232
233impl TableType {
234 fn from_prost(prost: PbTableType) -> Self {
235 match prost {
236 PbTableType::Table => Self::Table,
237 PbTableType::MaterializedView => Self::MaterializedView,
238 PbTableType::Index => Self::Index,
239 PbTableType::Internal => Self::Internal,
240 PbTableType::VectorIndex => Self::VectorIndex,
241 PbTableType::Unspecified => unreachable!(),
242 }
243 }
244
245 pub(crate) fn to_prost(self) -> PbTableType {
246 match self {
247 Self::Table => PbTableType::Table,
248 Self::MaterializedView => PbTableType::MaterializedView,
249 Self::Index => PbTableType::Index,
250 Self::VectorIndex => PbTableType::VectorIndex,
251 Self::Internal => PbTableType::Internal,
252 }
253 }
254}
255
256#[derive(Clone, Debug, PartialEq, Eq, Hash)]
258pub struct TableVersion {
259 pub version_id: TableVersionId,
260 pub next_column_id: ColumnId,
261}
262
263impl TableVersion {
264 #[cfg(test)]
266 pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
267 use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
268
269 Self {
270 version_id: INITIAL_TABLE_VERSION_ID,
271 next_column_id: max_column_id.next(),
272 }
273 }
274
275 pub fn from_prost(prost: PbTableVersion) -> Self {
276 Self {
277 version_id: prost.version,
278 next_column_id: ColumnId::from(prost.next_column_id),
279 }
280 }
281
282 pub fn to_prost(&self) -> PbTableVersion {
283 PbTableVersion {
284 version: self.version_id,
285 next_column_id: self.next_column_id.into(),
286 }
287 }
288}
289
290impl TableCatalog {
291 pub fn create_sql_purified(&self) -> String {
296 self.create_sql_ast_purified()
297 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
298 .unwrap_or_else(|_| self.create_sql())
299 }
300
301 pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
306 if let TableType::Table = self.table_type() {
308 let base = if self.definition.is_empty() {
309 let name = ast::ObjectName(vec![self.name.as_str().into()]);
311 ast::Statement::default_create_table(name)
312 } else {
313 self.create_sql_ast_from_persisted()?
314 };
315
316 match try_purify_table_source_create_sql_ast(
317 base,
318 self.columns(),
319 self.row_id_index,
320 &self.pk_column_ids(),
321 ) {
322 Ok(stmt) => return Ok(stmt),
323 Err(e) => notice_to_user(format!(
324 "error occurred while purifying definition for table \"{}\", \
325 results may be inaccurate: {}",
326 self.name,
327 e.as_report()
328 )),
329 }
330 }
331
332 self.create_sql_ast_from_persisted()
333 }
334}
335
336impl TableCatalog {
337 pub fn id(&self) -> TableId {
339 self.id
340 }
341
342 pub fn with_id(mut self, id: TableId) -> Self {
343 self.id = id;
344 self
345 }
346
347 pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
348 self.cleaned_by_watermark = cleaned_by_watermark;
349 self
350 }
351
352 pub fn conflict_behavior(&self) -> ConflictBehavior {
353 self.conflict_behavior
354 }
355
356 pub fn table_type(&self) -> TableType {
357 self.table_type
358 }
359
360 pub fn engine(&self) -> Engine {
361 self.engine
362 }
363
364 pub fn iceberg_source_name(&self) -> Option<String> {
365 match self.engine {
366 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
367 Engine::Hummock => None,
368 }
369 }
370
371 pub fn iceberg_sink_name(&self) -> Option<String> {
372 match self.engine {
373 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
374 Engine::Hummock => None,
375 }
376 }
377
378 pub fn is_user_table(&self) -> bool {
379 self.table_type == TableType::Table
380 }
381
382 pub fn is_internal_table(&self) -> bool {
383 self.table_type == TableType::Internal
384 }
385
386 pub fn is_mview(&self) -> bool {
387 self.table_type == TableType::MaterializedView
388 }
389
390 pub fn is_index(&self) -> bool {
391 self.table_type == TableType::Index
392 }
393
394 #[must_use]
396 pub fn bad_drop_error(&self) -> RwError {
397 let msg = match self.table_type {
398 TableType::MaterializedView => {
399 "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
400 }
401 TableType::Index | TableType::VectorIndex => "Use `DROP INDEX` to drop an index.",
402 TableType::Table => "Use `DROP TABLE` to drop a table.",
403 TableType::Internal => "Internal tables cannot be dropped.",
404 };
405
406 ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
407 }
408
409 #[must_use]
411 pub fn associated_source_id(&self) -> Option<TableId> {
412 self.associated_source_id
413 }
414
415 pub fn has_associated_source(&self) -> bool {
416 self.associated_source_id.is_some()
417 }
418
419 pub fn columns(&self) -> &[ColumnCatalog] {
421 &self.columns
422 }
423
424 pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
425 self.columns
426 .iter()
427 .filter(|c| !c.is_rw_timestamp_column())
428 .cloned()
429 .collect()
430 }
431
432 pub fn pk(&self) -> &[ColumnOrder] {
434 self.pk.as_ref()
435 }
436
437 pub fn pk_column_ids(&self) -> Vec<ColumnId> {
439 self.pk
440 .iter()
441 .map(|x| self.columns[x.column_index].column_id())
442 .collect()
443 }
444
445 pub fn pk_column_names(&self) -> Vec<&str> {
447 self.pk
448 .iter()
449 .map(|x| self.columns[x.column_index].name())
450 .collect()
451 }
452
453 pub fn stream_key(&self) -> Vec<usize> {
457 if self
459 .distribution_key
460 .iter()
461 .any(|dist_key| !self.stream_key.contains(dist_key))
462 {
463 let mut new_stream_key = self.distribution_key.clone();
464 let mut seen: HashSet<usize> = self.distribution_key.iter().copied().collect();
465 for &key in &self.stream_key {
466 if seen.insert(key) {
467 new_stream_key.push(key);
468 }
469 }
470 new_stream_key
471 } else {
472 self.stream_key.clone()
473 }
474 }
475
476 pub fn table_desc(&self) -> TableDesc {
481 use risingwave_common::catalog::TableOption;
482
483 let table_options = TableOption::new(self.retention_seconds);
484
485 TableDesc {
486 table_id: self.id,
487 pk: self.pk.clone(),
488 stream_key: self.stream_key(),
489 columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
490 distribution_key: self.distribution_key.clone(),
491 append_only: self.append_only,
492 retention_seconds: table_options.retention_seconds,
493 value_indices: self.value_indices.clone(),
494 read_prefix_len_hint: self.read_prefix_len_hint,
495 watermark_columns: self.watermark_columns.clone(),
496 versioned: self.version.is_some(),
497 vnode_col_index: self.vnode_col_index,
498 vnode_count: self.vnode_count(),
499 }
500 }
501
502 pub fn name(&self) -> &str {
504 self.name.as_ref()
505 }
506
507 pub fn distribution_key(&self) -> &[usize] {
508 self.distribution_key.as_ref()
509 }
510
511 pub fn to_internal_table_prost(&self) -> PbTable {
512 self.to_prost()
513 }
514
515 pub fn create_sql(&self) -> String {
519 self.create_sql_ast()
520 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
521 .unwrap_or_else(|_| self.definition.clone())
522 }
523
524 pub fn create_sql_ast(&self) -> Result<ast::Statement> {
532 if let TableType::Table = self.table_type()
533 && self.definition.is_empty()
534 {
535 self.create_sql_ast_purified()
537 } else {
538 self.create_sql_ast_from_persisted()
540 }
541 }
542
543 fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
544 Ok(Parser::parse_exactly_one(&self.definition)?)
545 }
546
547 pub fn version(&self) -> Option<&TableVersion> {
549 self.version.as_ref()
550 }
551
552 pub fn version_id(&self) -> Option<TableVersionId> {
554 self.version().map(|v| v.version_id)
555 }
556
557 pub fn vnode_count(&self) -> usize {
559 if self.id().is_placeholder() {
560 0
561 } else {
562 self.vnode_count.value()
564 }
565 }
566
567 pub fn to_prost(&self) -> PbTable {
568 PbTable {
569 id: self.id,
570 schema_id: self.schema_id,
571 database_id: self.database_id,
572 name: self.name.clone(),
573 columns: self
575 .columns_without_rw_timestamp()
576 .iter()
577 .map(|c| c.to_protobuf())
578 .collect(),
579 pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
580 stream_key: self.stream_key().iter().map(|x| *x as _).collect(),
581 optional_associated_source_id: self.associated_source_id.map(|source_id| {
582 OptionalAssociatedSourceId::AssociatedSourceId(source_id.as_raw_id())
583 }),
584 table_type: self.table_type.to_prost() as i32,
585 distribution_key: self
586 .distribution_key
587 .iter()
588 .map(|k| *k as i32)
589 .collect_vec(),
590 append_only: self.append_only,
591 owner: self.owner,
592 fragment_id: self.fragment_id,
593 dml_fragment_id: self.dml_fragment_id,
594 vnode_col_index: self.vnode_col_index.map(|i| i as _),
595 row_id_index: self.row_id_index.map(|i| i as _),
596 value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
597 definition: self.definition.clone(),
598 read_prefix_len_hint: self.read_prefix_len_hint as u32,
599 version: self.version.as_ref().map(TableVersion::to_prost),
600 watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
601 dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
602 handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
603 version_column_indices: self
604 .version_column_indices
605 .iter()
606 .map(|&idx| idx as u32)
607 .collect(),
608 cardinality: Some(self.cardinality.to_protobuf()),
609 initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
610 created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
611 cleaned_by_watermark: self.cleaned_by_watermark,
612 stream_job_status: self.stream_job_status.to_proto().into(),
613 create_type: self.create_type.to_proto().into(),
614 description: self.description.clone(),
615 #[expect(deprecated)]
616 incoming_sinks: vec![],
617 created_at_cluster_version: self.created_at_cluster_version.clone(),
618 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
619 retention_seconds: self.retention_seconds,
620 cdc_table_id: self.cdc_table_id.clone(),
621 maybe_vnode_count: self.vnode_count.to_protobuf(),
622 webhook_info: self.webhook_info.clone(),
623 job_id: self.job_id,
624 engine: Some(self.engine.to_protobuf().into()),
625 clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
626 refreshable: self.refreshable,
627 vector_index_info: self.vector_index_info,
628 cdc_table_type: self
629 .cdc_table_type
630 .clone()
631 .map(|t| PbCdcTableType::from(t) as i32),
632 refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
633 }
634 }
635
636 pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
638 self.columns
639 .iter()
640 .filter(|c| !c.is_hidden() && !c.is_generated())
641 }
642
643 pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
644 self.columns
645 .iter()
646 .filter(|c| c.is_generated())
647 .map(|c| c.name())
648 }
649
650 pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
651 self.columns
652 .iter()
653 .enumerate()
654 .filter(|(_, c)| c.is_generated())
655 .map(|(i, _)| i)
656 }
657
658 pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
659 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
660 .columns[col_idx]
661 .column_desc
662 .generated_or_default_column
663 .as_ref()
664 {
665 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
666 .expect("expr in default columns corrupted")
667 } else {
668 ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
669 }
670 }
671
672 pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
673 columns
674 .iter()
675 .map(|c| {
676 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
677 expr,
678 ..
679 })) = c.column_desc.generated_or_default_column.as_ref()
680 {
681 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
682 .expect("expr in default columns corrupted")
683 } else {
684 ExprImpl::literal_null(c.data_type().clone())
685 }
686 })
687 .collect()
688 }
689
690 pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
691 self.columns.iter().enumerate().filter_map(|(i, c)| {
692 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
693 expr, ..
694 })) = c.column_desc.generated_or_default_column.as_ref()
695 {
696 Some((
697 i,
698 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
699 .expect("expr in default columns corrupted"),
700 ))
701 } else {
702 None
703 }
704 })
705 }
706
707 pub fn has_generated_column(&self) -> bool {
708 self.columns.iter().any(|c| c.is_generated())
709 }
710
711 pub fn has_rw_timestamp_column(&self) -> bool {
712 self.columns.iter().any(|c| c.is_rw_timestamp_column())
713 }
714
715 pub fn column_schema(&self) -> Schema {
716 Schema::new(
717 self.columns
718 .iter()
719 .map(|c| Field::from(&c.column_desc))
720 .collect(),
721 )
722 }
723
724 pub fn is_created(&self) -> bool {
725 self.stream_job_status == StreamJobStatus::Created
726 }
727
728 pub fn is_iceberg_engine_table(&self) -> bool {
729 self.engine == Engine::Iceberg
730 }
731
732 pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
733 self.pk.iter().map(|col| col.column_index)
734 }
735
736 pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
737 ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
738 }
739
740 pub fn order_column_ids(&self) -> Vec<ColumnId> {
741 self.pk
742 .iter()
743 .map(|col| self.columns[col.column_index].column_desc.column_id)
744 .collect()
745 }
746
747 pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
748 self.pk.iter().map(|x| x.to_protobuf()).collect()
750 }
751}
752
753impl From<PbTable> for TableCatalog {
754 fn from(tb: PbTable) -> Self {
755 let id = tb.id;
756 let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
757 let tb_engine = tb
758 .get_engine()
759 .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
760 .unwrap_or(PbEngine::Hummock);
761 let table_type = tb.get_table_type().unwrap();
762 let stream_job_status = tb
763 .get_stream_job_status()
764 .unwrap_or(PbStreamJobStatus::Created);
765 let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
766 let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
767 OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
768 });
769 let name = tb.name.clone();
770
771 let vnode_count = tb.vnode_count_inner();
772 if let VnodeCount::Placeholder = vnode_count {
773 assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
776 }
777
778 let mut col_names = HashSet::new();
779 let mut col_index: HashMap<i32, usize> = HashMap::new();
780
781 let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
782 let version_column_indices: Vec<usize> = tb
783 .version_column_indices
784 .iter()
785 .map(|&idx| idx as usize)
786 .collect();
787 let mut columns: Vec<ColumnCatalog> =
788 tb.columns.into_iter().map(ColumnCatalog::from).collect();
789 if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
790 columns.push(ColumnCatalog::rw_timestamp_column());
792 }
793 for (idx, catalog) in columns.clone().into_iter().enumerate() {
794 let col_name = catalog.name();
795 if !col_names.insert(col_name.to_owned()) {
796 panic!("duplicated column name {} in table {} ", col_name, tb.name)
797 }
798
799 let col_id = catalog.column_desc.column_id.get_id();
800 col_index.insert(col_id, idx);
801 }
802
803 let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
804 let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
805 for idx in &tb.watermark_indices {
806 watermark_columns.insert(*idx as _);
807 }
808 let engine = Engine::from_protobuf(&tb_engine);
809
810 Self {
811 id,
812 schema_id: tb.schema_id,
813 database_id: tb.database_id,
814 associated_source_id: associated_source_id.map(Into::into),
815 name,
816 pk,
817 columns,
818 table_type: TableType::from_prost(table_type),
819 distribution_key: tb
820 .distribution_key
821 .iter()
822 .map(|k| *k as usize)
823 .collect_vec(),
824 stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
825 append_only: tb.append_only,
826 owner: tb.owner,
827 fragment_id: tb.fragment_id,
828 dml_fragment_id: tb.dml_fragment_id,
829 vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
830 row_id_index: tb.row_id_index.map(|x| x as usize),
831 value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
832 definition: tb.definition,
833 conflict_behavior,
834 version_column_indices,
835 read_prefix_len_hint: tb.read_prefix_len_hint as usize,
836 version: tb.version.map(TableVersion::from_prost),
837 watermark_columns,
838 dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
839 cardinality: tb
840 .cardinality
841 .map(|c| Cardinality::from_protobuf(&c))
842 .unwrap_or_else(Cardinality::unknown),
843 created_at_epoch: tb.created_at_epoch.map(Epoch::from),
844 initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
845 cleaned_by_watermark: tb.cleaned_by_watermark,
846 create_type: CreateType::from_proto(create_type),
847 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
848 description: tb.description,
849 created_at_cluster_version: tb.created_at_cluster_version.clone(),
850 initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
851 retention_seconds: tb.retention_seconds,
852 cdc_table_id: tb.cdc_table_id,
853 vnode_count,
854 webhook_info: tb.webhook_info,
855 job_id: tb.job_id,
856 engine,
857 clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
858
859 refreshable: tb.refreshable,
860 vector_index_info: tb.vector_index_info,
861 cdc_table_type: tb
862 .cdc_table_type
863 .and_then(|t| PbCdcTableType::try_from(t).ok())
864 .map(ExternalCdcTableType::from),
865 }
866 }
867}
868
869impl From<&PbTable> for TableCatalog {
870 fn from(tb: &PbTable) -> Self {
871 tb.clone().into()
872 }
873}
874
875impl OwnedByUserCatalog for TableCatalog {
876 fn owner(&self) -> UserId {
877 self.owner
878 }
879}
880
881#[cfg(test)]
882mod tests {
883 use risingwave_common::catalog::{ColumnDesc, ColumnId};
884 use risingwave_common::test_prelude::*;
885 use risingwave_common::types::*;
886 use risingwave_common::util::sort_util::OrderType;
887 use risingwave_pb::catalog::table::PbEngine;
888 use risingwave_pb::plan_common::{
889 AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
890 };
891
892 use super::*;
893
894 #[test]
895 fn test_into_table_catalog() {
896 let table: TableCatalog = PbTable {
897 id: 0.into(),
898 schema_id: 0.into(),
899 database_id: 0.into(),
900 name: "test".to_owned(),
901 table_type: PbTableType::Table as i32,
902 columns: vec![
903 ColumnCatalog::row_id_column().to_protobuf(),
904 PbColumnCatalog {
905 column_desc: Some(PbColumnDesc::new(
906 DataType::from(StructType::new([
907 ("address", DataType::Varchar),
908 ("zipcode", DataType::Varchar),
909 ]))
910 .to_protobuf(),
911 "country",
912 1,
913 )),
914 is_hidden: false,
915 },
916 ],
917 pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
918 stream_key: vec![0],
919 distribution_key: vec![0],
920 optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233)
921 .into(),
922 append_only: false,
923 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
924 retention_seconds: Some(300),
925 fragment_id: 0,
926 dml_fragment_id: None,
927 initialized_at_epoch: None,
928 value_indices: vec![0],
929 definition: "".into(),
930 read_prefix_len_hint: 0,
931 vnode_col_index: None,
932 row_id_index: None,
933 version: Some(PbTableVersion {
934 version: 0,
935 next_column_id: 2,
936 }),
937 watermark_indices: vec![],
938 handle_pk_conflict_behavior: 3,
939 dist_key_in_pk: vec![0],
940 cardinality: None,
941 created_at_epoch: None,
942 cleaned_by_watermark: false,
943 stream_job_status: PbStreamJobStatus::Created.into(),
944 create_type: PbCreateType::Foreground.into(),
945 description: Some("description".to_owned()),
946 #[expect(deprecated)]
947 incoming_sinks: vec![],
948 created_at_cluster_version: None,
949 initialized_at_cluster_version: None,
950 version_column_indices: Vec::new(),
951 cdc_table_id: None,
952 maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
953 webhook_info: None,
954 job_id: None,
955 engine: Some(PbEngine::Hummock as i32),
956 clean_watermark_index_in_pk: None,
957
958 refreshable: false,
959 vector_index_info: None,
960 cdc_table_type: None,
961 refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
962 }
963 .into();
964
965 assert_eq!(
966 table,
967 TableCatalog {
968 id: TableId::new(0),
969 schema_id: 0.into(),
970 database_id: 0.into(),
971 associated_source_id: Some(TableId::new(233)),
972 name: "test".to_owned(),
973 table_type: TableType::Table,
974 columns: vec![
975 ColumnCatalog::row_id_column(),
976 ColumnCatalog {
977 column_desc: ColumnDesc {
978 data_type: StructType::new(vec![
979 ("address", DataType::Varchar),
980 ("zipcode", DataType::Varchar)
981 ],)
982 .into(),
983 column_id: ColumnId::new(1),
984 name: "country".to_owned(),
985 description: None,
986 generated_or_default_column: None,
987 additional_column: AdditionalColumn { column_type: None },
988 version: ColumnDescVersion::LATEST,
989 system_column: None,
990 nullable: true,
991 },
992 is_hidden: false
993 },
994 ColumnCatalog::rw_timestamp_column(),
995 ],
996 stream_key: vec![0],
997 pk: vec![ColumnOrder::new(0, OrderType::ascending())],
998 distribution_key: vec![0],
999 append_only: false,
1000 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
1001 retention_seconds: Some(300),
1002 fragment_id: 0,
1003 dml_fragment_id: None,
1004 vnode_col_index: None,
1005 row_id_index: None,
1006 value_indices: vec![0],
1007 definition: "".into(),
1008 conflict_behavior: ConflictBehavior::NoCheck,
1009 read_prefix_len_hint: 0,
1010 version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
1011 watermark_columns: FixedBitSet::with_capacity(3),
1012 dist_key_in_pk: vec![0],
1013 cardinality: Cardinality::unknown(),
1014 created_at_epoch: None,
1015 initialized_at_epoch: None,
1016 cleaned_by_watermark: false,
1017 stream_job_status: StreamJobStatus::Created,
1018 create_type: CreateType::Foreground,
1019 description: Some("description".to_owned()),
1020 created_at_cluster_version: None,
1021 initialized_at_cluster_version: None,
1022 version_column_indices: Vec::new(),
1023 cdc_table_id: None,
1024 vnode_count: VnodeCount::set(233),
1025 webhook_info: None,
1026 job_id: None,
1027 engine: Engine::Hummock,
1028 clean_watermark_index_in_pk: None,
1029
1030 refreshable: false,
1031 vector_index_info: None,
1032 cdc_table_type: None,
1033 }
1034 );
1035 assert_eq!(table, TableCatalog::from(table.to_prost()));
1036 }
1037}