1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21 ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, ICEBERG_SINK_PREFIX,
22 ICEBERG_SOURCE_PREFIX, Schema, StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::id::{JobId, SourceId};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_connector::source::cdc::external::ExternalCdcTableType;
29use risingwave_pb::catalog::table::{
30 CdcTableType as PbCdcTableType, PbEngine, PbTableType, PbTableVersion,
31};
32use risingwave_pb::catalog::{
33 PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
34};
35use risingwave_pb::common::PbColumnOrder;
36use risingwave_pb::plan_common::DefaultColumnDesc;
37use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
38use risingwave_sqlparser::ast;
39use risingwave_sqlparser::parser::Parser;
40use thiserror_ext::AsReport as _;
41
42use super::purify::try_purify_table_source_create_sql_ast;
43use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId};
44use crate::error::{ErrorCode, Result, RwError};
45use crate::expr::ExprImpl;
46use crate::optimizer::property::Cardinality;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50#[derive(Clone, Debug, PartialEq, Eq, Hash)]
82#[cfg_attr(test, derive(Default))]
83pub struct TableCatalog {
84 pub id: TableId,
85
86 pub schema_id: SchemaId,
87
88 pub database_id: DatabaseId,
89
90 pub associated_source_id: Option<SourceId>, pub name: String,
93
94 pub columns: Vec<ColumnCatalog>,
96
97 pub pk: Vec<ColumnOrder>,
99
100 pub stream_key: Vec<usize>,
104
105 pub table_type: TableType,
108
109 pub distribution_key: Vec<usize>,
111
112 pub append_only: bool,
115
116 pub cardinality: Cardinality,
118
119 pub owner: UserId,
121
122 pub retention_seconds: Option<u32>,
124
125 pub fragment_id: FragmentId,
127
128 pub dml_fragment_id: Option<FragmentId>,
130
131 pub vnode_col_index: Option<usize>,
134
135 pub row_id_index: Option<usize>,
138
139 pub value_indices: Vec<usize>,
141
142 pub definition: String,
144
145 pub conflict_behavior: ConflictBehavior,
149
150 pub version_column_indices: Vec<usize>,
151
152 pub read_prefix_len_hint: usize,
153
154 pub version: Option<TableVersion>,
156
157 pub watermark_columns: FixedBitSet,
159
160 pub dist_key_in_pk: Vec<usize>,
163
164 pub created_at_epoch: Option<Epoch>,
165
166 pub initialized_at_epoch: Option<Epoch>,
167
168 pub create_type: CreateType,
170
171 pub stream_job_status: StreamJobStatus,
174
175 pub description: Option<String>,
177
178 pub created_at_cluster_version: Option<String>,
179
180 pub initialized_at_cluster_version: Option<String>,
181
182 pub cdc_table_id: Option<String>,
183
184 pub vnode_count: VnodeCount,
193
194 pub webhook_info: Option<PbWebhookSourceInfo>,
195
196 pub job_id: Option<JobId>,
197
198 pub engine: Engine,
199
200 pub clean_watermark_index_in_pk: Option<usize>,
201
202 pub clean_watermark_indices: Vec<usize>,
206
207 pub refreshable: bool,
209
210 pub vector_index_info: Option<PbVectorIndexInfo>,
211
212 pub cdc_table_type: Option<ExternalCdcTableType>,
213}
214
215#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
216#[cfg_attr(test, derive(Default))]
217pub enum TableType {
218 #[cfg_attr(test, default)]
220 Table,
221 MaterializedView,
223 Index,
226 VectorIndex,
227 Internal,
229}
230
231impl TableType {
232 fn from_prost(prost: PbTableType) -> Self {
233 match prost {
234 PbTableType::Table => Self::Table,
235 PbTableType::MaterializedView => Self::MaterializedView,
236 PbTableType::Index => Self::Index,
237 PbTableType::Internal => Self::Internal,
238 PbTableType::VectorIndex => Self::VectorIndex,
239 PbTableType::Unspecified => unreachable!(),
240 }
241 }
242
243 pub(crate) fn to_prost(self) -> PbTableType {
244 match self {
245 Self::Table => PbTableType::Table,
246 Self::MaterializedView => PbTableType::MaterializedView,
247 Self::Index => PbTableType::Index,
248 Self::VectorIndex => PbTableType::VectorIndex,
249 Self::Internal => PbTableType::Internal,
250 }
251 }
252}
253
254#[derive(Clone, Debug, PartialEq, Eq, Hash)]
256pub struct TableVersion {
257 pub version_id: TableVersionId,
258 pub next_column_id: ColumnId,
259}
260
261impl TableVersion {
262 #[cfg(test)]
264 pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
265 use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
266
267 Self {
268 version_id: INITIAL_TABLE_VERSION_ID,
269 next_column_id: max_column_id.next(),
270 }
271 }
272
273 pub fn from_prost(prost: PbTableVersion) -> Self {
274 Self {
275 version_id: prost.version,
276 next_column_id: ColumnId::from(prost.next_column_id),
277 }
278 }
279
280 pub fn to_prost(&self) -> PbTableVersion {
281 PbTableVersion {
282 version: self.version_id,
283 next_column_id: self.next_column_id.into(),
284 }
285 }
286}
287
288impl TableCatalog {
289 pub fn create_sql_purified(&self) -> String {
294 self.create_sql_ast_purified()
295 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
296 .unwrap_or_else(|_| self.create_sql())
297 }
298
299 pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
304 if let TableType::Table = self.table_type() {
306 let base = if self.definition.is_empty() {
307 let name = ast::ObjectName(vec![self.name.as_str().into()]);
309 ast::Statement::default_create_table(name)
310 } else {
311 self.create_sql_ast_from_persisted()?
312 };
313
314 match try_purify_table_source_create_sql_ast(
315 base,
316 self.columns(),
317 self.row_id_index,
318 &self.pk_column_ids(),
319 ) {
320 Ok(stmt) => return Ok(stmt),
321 Err(e) => notice_to_user(format!(
322 "error occurred while purifying definition for table \"{}\", \
323 results may be inaccurate: {}",
324 self.name,
325 e.as_report()
326 )),
327 }
328 }
329
330 self.create_sql_ast_from_persisted()
331 }
332}
333
334impl TableCatalog {
335 pub fn id(&self) -> TableId {
337 self.id
338 }
339
340 pub fn with_id(mut self, id: TableId) -> Self {
341 self.id = id;
342 self
343 }
344
345 pub fn conflict_behavior(&self) -> ConflictBehavior {
346 self.conflict_behavior
347 }
348
349 pub fn table_type(&self) -> TableType {
350 self.table_type
351 }
352
353 pub fn engine(&self) -> Engine {
354 self.engine
355 }
356
357 pub fn iceberg_source_name(&self) -> Option<String> {
358 match self.engine {
359 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
360 Engine::Hummock => None,
361 }
362 }
363
364 pub fn iceberg_sink_name(&self) -> Option<String> {
365 match self.engine {
366 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
367 Engine::Hummock => None,
368 }
369 }
370
371 pub fn is_user_table(&self) -> bool {
372 self.table_type == TableType::Table
373 }
374
375 pub fn is_internal_table(&self) -> bool {
376 self.table_type == TableType::Internal
377 }
378
379 pub fn is_mview(&self) -> bool {
380 self.table_type == TableType::MaterializedView
381 }
382
383 pub fn is_index(&self) -> bool {
384 self.table_type == TableType::Index
385 }
386
387 #[must_use]
389 pub fn bad_drop_error(&self) -> RwError {
390 let msg = match self.table_type {
391 TableType::MaterializedView => {
392 "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
393 }
394 TableType::Index | TableType::VectorIndex => "Use `DROP INDEX` to drop an index.",
395 TableType::Table => "Use `DROP TABLE` to drop a table.",
396 TableType::Internal => "Internal tables cannot be dropped.",
397 };
398
399 ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
400 }
401
402 #[must_use]
404 pub fn associated_source_id(&self) -> Option<SourceId> {
405 self.associated_source_id
406 }
407
408 pub fn has_associated_source(&self) -> bool {
409 self.associated_source_id.is_some()
410 }
411
412 pub fn columns(&self) -> &[ColumnCatalog] {
414 &self.columns
415 }
416
417 pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
418 self.columns
419 .iter()
420 .filter(|c| !c.is_rw_timestamp_column())
421 .cloned()
422 .collect()
423 }
424
425 pub fn pk(&self) -> &[ColumnOrder] {
427 self.pk.as_ref()
428 }
429
430 pub fn pk_column_ids(&self) -> Vec<ColumnId> {
432 self.pk
433 .iter()
434 .map(|x| self.columns[x.column_index].column_id())
435 .collect()
436 }
437
438 pub fn pk_column_names(&self) -> Vec<&str> {
440 self.pk
441 .iter()
442 .map(|x| self.columns[x.column_index].name())
443 .collect()
444 }
445
446 pub fn stream_key(&self) -> Vec<usize> {
450 if self
452 .distribution_key
453 .iter()
454 .any(|dist_key| !self.stream_key.contains(dist_key))
455 {
456 let mut new_stream_key = self.distribution_key.clone();
457 let mut seen: HashSet<usize> = self.distribution_key.iter().copied().collect();
458 for &key in &self.stream_key {
459 if seen.insert(key) {
460 new_stream_key.push(key);
461 }
462 }
463 new_stream_key
464 } else {
465 self.stream_key.clone()
466 }
467 }
468
469 pub fn table_desc(&self) -> TableDesc {
474 use risingwave_common::catalog::TableOption;
475
476 let table_options = TableOption::new(self.retention_seconds);
477
478 TableDesc {
479 table_id: self.id,
480 pk: self.pk.clone(),
481 stream_key: self.stream_key(),
482 columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
483 distribution_key: self.distribution_key.clone(),
484 append_only: self.append_only,
485 retention_seconds: table_options.retention_seconds,
486 value_indices: self.value_indices.clone(),
487 read_prefix_len_hint: self.read_prefix_len_hint,
488 watermark_columns: self.watermark_columns.clone(),
489 versioned: self.version.is_some(),
490 vnode_col_index: self.vnode_col_index,
491 vnode_count: self.vnode_count(),
492 }
493 }
494
495 pub fn name(&self) -> &str {
497 self.name.as_ref()
498 }
499
500 pub fn distribution_key(&self) -> &[usize] {
501 self.distribution_key.as_ref()
502 }
503
504 pub fn to_internal_table_prost(&self) -> PbTable {
505 self.to_prost()
506 }
507
508 pub fn create_sql(&self) -> String {
512 self.create_sql_ast()
513 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
514 .unwrap_or_else(|_| self.definition.clone())
515 }
516
517 pub fn create_sql_ast(&self) -> Result<ast::Statement> {
525 if let TableType::Table = self.table_type()
526 && self.definition.is_empty()
527 {
528 self.create_sql_ast_purified()
530 } else {
531 self.create_sql_ast_from_persisted()
533 }
534 }
535
536 fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
537 Ok(Parser::parse_exactly_one(&self.definition)?)
538 }
539
540 pub fn version(&self) -> Option<&TableVersion> {
542 self.version.as_ref()
543 }
544
545 pub fn version_id(&self) -> Option<TableVersionId> {
547 self.version().map(|v| v.version_id)
548 }
549
550 pub fn vnode_count(&self) -> usize {
552 if self.id().is_placeholder() {
553 0
554 } else {
555 self.vnode_count.value()
557 }
558 }
559
560 pub fn to_prost(&self) -> PbTable {
561 PbTable {
562 id: self.id,
563 schema_id: self.schema_id,
564 database_id: self.database_id,
565 name: self.name.clone(),
566 columns: self
568 .columns_without_rw_timestamp()
569 .iter()
570 .map(|c| c.to_protobuf())
571 .collect(),
572 pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
573 stream_key: self.stream_key().iter().map(|x| *x as _).collect(),
574 optional_associated_source_id: self.associated_source_id.map(Into::into),
575 table_type: self.table_type.to_prost() as i32,
576 distribution_key: self
577 .distribution_key
578 .iter()
579 .map(|k| *k as i32)
580 .collect_vec(),
581 append_only: self.append_only,
582 owner: self.owner,
583 fragment_id: self.fragment_id,
584 dml_fragment_id: self.dml_fragment_id,
585 vnode_col_index: self.vnode_col_index.map(|i| i as _),
586 row_id_index: self.row_id_index.map(|i| i as _),
587 value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
588 definition: self.definition.clone(),
589 read_prefix_len_hint: self.read_prefix_len_hint as u32,
590 version: self.version.as_ref().map(TableVersion::to_prost),
591 watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
592 dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
593 handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
594 version_column_indices: self
595 .version_column_indices
596 .iter()
597 .map(|&idx| idx as u32)
598 .collect(),
599 cardinality: Some(self.cardinality.to_protobuf()),
600 initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
601 created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
602 #[expect(deprecated)]
603 cleaned_by_watermark: false,
604 stream_job_status: self.stream_job_status.to_proto().into(),
605 create_type: self.create_type.to_proto().into(),
606 description: self.description.clone(),
607 #[expect(deprecated)]
608 incoming_sinks: vec![],
609 created_at_cluster_version: self.created_at_cluster_version.clone(),
610 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
611 retention_seconds: self.retention_seconds,
612 cdc_table_id: self.cdc_table_id.clone(),
613 maybe_vnode_count: self.vnode_count.to_protobuf(),
614 webhook_info: self.webhook_info.clone(),
615 job_id: self.job_id,
616 engine: Some(self.engine.to_protobuf().into()),
617 #[expect(deprecated)]
618 clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
619 clean_watermark_indices: self
620 .clean_watermark_indices
621 .iter()
622 .map(|&x| x as u32)
623 .collect(),
624 refreshable: self.refreshable,
625 vector_index_info: self.vector_index_info,
626 cdc_table_type: self
627 .cdc_table_type
628 .clone()
629 .map(|t| PbCdcTableType::from(t) as i32),
630 }
631 }
632
633 pub fn columns_to_insert(
639 &self,
640 ) -> (
641 impl Iterator<Item = (&ColumnCatalog, bool)> + '_,
642 Option<usize>,
643 ) {
644 let pk_column_indices: HashSet<_> =
645 self.pk.iter().map(|column| column.column_index).collect();
646 let columns = self
647 .columns
648 .iter()
649 .enumerate()
650 .filter(|(_, c)| !c.is_hidden() && !c.is_generated())
651 .map(move |(idx, column)| (column, pk_column_indices.contains(&idx)));
652 let row_id_index = self.row_id_index.map(|row_id_index| {
653 let generated_column_count = self
654 .columns()
655 .iter()
656 .take(row_id_index + 1)
657 .filter(|column| column.is_generated())
658 .count();
659 row_id_index - generated_column_count
660 });
661 (columns, row_id_index)
662 }
663
664 pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
665 self.columns
666 .iter()
667 .filter(|c| c.is_generated())
668 .map(|c| c.name())
669 }
670
671 pub fn first_generated_column(&self) -> Option<(usize, &str)> {
672 self.columns
673 .iter()
674 .filter(|c| !c.is_hidden())
675 .enumerate()
676 .find(|(_, c)| c.is_generated())
677 .map(|(idx, c)| (idx, c.name()))
678 }
679
680 pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
681 self.columns
682 .iter()
683 .enumerate()
684 .filter(|(_, c)| c.is_generated())
685 .map(|(i, _)| i)
686 }
687
688 pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
689 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
690 .columns[col_idx]
691 .column_desc
692 .generated_or_default_column
693 .as_ref()
694 {
695 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
696 .expect("expr in default columns corrupted")
697 } else {
698 ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
699 }
700 }
701
702 pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
703 columns
704 .iter()
705 .map(|c| {
706 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
707 expr,
708 ..
709 })) = c.column_desc.generated_or_default_column.as_ref()
710 {
711 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
712 .expect("expr in default columns corrupted")
713 } else {
714 ExprImpl::literal_null(c.data_type().clone())
715 }
716 })
717 .collect()
718 }
719
720 pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
721 self.columns.iter().enumerate().filter_map(|(i, c)| {
722 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
723 expr, ..
724 })) = c.column_desc.generated_or_default_column.as_ref()
725 {
726 Some((
727 i,
728 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
729 .expect("expr in default columns corrupted"),
730 ))
731 } else {
732 None
733 }
734 })
735 }
736
737 pub fn has_generated_column(&self) -> bool {
738 self.columns.iter().any(|c| c.is_generated())
739 }
740
741 pub fn has_rw_timestamp_column(&self) -> bool {
742 self.columns.iter().any(|c| c.is_rw_timestamp_column())
743 }
744
745 pub fn column_schema(&self) -> Schema {
746 Schema::new(
747 self.columns
748 .iter()
749 .map(|c| Field::from(&c.column_desc))
750 .collect(),
751 )
752 }
753
754 pub fn is_created(&self) -> bool {
755 self.stream_job_status == StreamJobStatus::Created
756 }
757
758 pub fn is_iceberg_engine_table(&self) -> bool {
759 self.engine == Engine::Iceberg
760 }
761
762 pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
763 self.pk.iter().map(|col| col.column_index)
764 }
765
766 pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
767 ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
768 }
769
770 pub fn order_column_ids(&self) -> Vec<ColumnId> {
771 self.pk
772 .iter()
773 .map(|col| self.columns[col.column_index].column_desc.column_id)
774 .collect()
775 }
776
777 pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
778 self.pk.iter().map(|x| x.to_protobuf()).collect()
780 }
781}
782
783impl From<PbTable> for TableCatalog {
784 fn from(tb: PbTable) -> Self {
785 let id = tb.id;
786 let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
787 let tb_engine = tb
788 .get_engine()
789 .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
790 .unwrap_or(PbEngine::Hummock);
791 let table_type = tb.get_table_type().unwrap();
792 let stream_job_status = tb
793 .get_stream_job_status()
794 .unwrap_or(PbStreamJobStatus::Created);
795 let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
796 let associated_source_id = tb.optional_associated_source_id.map(Into::into);
797 let name = tb.name.clone();
798
799 let vnode_count = tb.vnode_count_inner();
800 if let VnodeCount::Placeholder = vnode_count {
801 assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
804 }
805
806 let mut col_names = HashSet::new();
807 let mut col_index: HashMap<i32, usize> = HashMap::new();
808
809 let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
810 let version_column_indices: Vec<usize> = tb
811 .version_column_indices
812 .iter()
813 .map(|&idx| idx as usize)
814 .collect();
815 let mut columns: Vec<ColumnCatalog> =
816 tb.columns.into_iter().map(ColumnCatalog::from).collect();
817 if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
818 columns.push(ColumnCatalog::rw_timestamp_column());
820 }
821 for (idx, catalog) in columns.clone().into_iter().enumerate() {
822 let col_name = catalog.name();
823 if !col_names.insert(col_name.to_owned()) {
824 panic!("duplicated column name {} in table {} ", col_name, tb.name)
825 }
826
827 let col_id = catalog.column_desc.column_id.get_id();
828 col_index.insert(col_id, idx);
829 }
830
831 let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
832 let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
833 for idx in &tb.watermark_indices {
834 watermark_columns.insert(*idx as _);
835 }
836 let engine = Engine::from_protobuf(&tb_engine);
837
838 Self {
839 id,
840 schema_id: tb.schema_id,
841 database_id: tb.database_id,
842 associated_source_id,
843 name,
844 pk,
845 columns,
846 table_type: TableType::from_prost(table_type),
847 distribution_key: tb
848 .distribution_key
849 .iter()
850 .map(|k| *k as usize)
851 .collect_vec(),
852 stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
853 append_only: tb.append_only,
854 owner: tb.owner,
855 fragment_id: tb.fragment_id,
856 dml_fragment_id: tb.dml_fragment_id,
857 vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
858 row_id_index: tb.row_id_index.map(|x| x as usize),
859 value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
860 definition: tb.definition,
861 conflict_behavior,
862 version_column_indices,
863 read_prefix_len_hint: tb.read_prefix_len_hint as usize,
864 version: tb.version.map(TableVersion::from_prost),
865 watermark_columns,
866 dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
867 cardinality: tb
868 .cardinality
869 .map(|c| Cardinality::from_protobuf(&c))
870 .unwrap_or_else(Cardinality::unknown),
871 created_at_epoch: tb.created_at_epoch.map(Epoch::from),
872 initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
873 create_type: CreateType::from_proto(create_type),
874 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
875 description: tb.description,
876 created_at_cluster_version: tb.created_at_cluster_version.clone(),
877 initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
878 retention_seconds: tb.retention_seconds,
879 cdc_table_id: tb.cdc_table_id,
880 vnode_count,
881 webhook_info: tb.webhook_info,
882 job_id: tb.job_id,
883 engine,
884 #[expect(deprecated)]
885 clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
886 clean_watermark_indices: tb
887 .clean_watermark_indices
888 .iter()
889 .map(|&x| x as usize)
890 .collect(),
891
892 refreshable: tb.refreshable,
893 vector_index_info: tb.vector_index_info,
894 cdc_table_type: tb
895 .cdc_table_type
896 .and_then(|t| PbCdcTableType::try_from(t).ok())
897 .map(ExternalCdcTableType::from),
898 }
899 }
900}
901
902impl From<&PbTable> for TableCatalog {
903 fn from(tb: &PbTable) -> Self {
904 tb.clone().into()
905 }
906}
907
908impl OwnedByUserCatalog for TableCatalog {
909 fn owner(&self) -> UserId {
910 self.owner
911 }
912}
913
914#[cfg(test)]
915mod tests {
916 use risingwave_common::catalog::{ColumnDesc, ColumnId};
917 use risingwave_common::test_prelude::*;
918 use risingwave_common::types::*;
919 use risingwave_common::util::sort_util::OrderType;
920 use risingwave_pb::catalog::table::PbEngine;
921 use risingwave_pb::plan_common::{
922 AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
923 };
924
925 use super::*;
926
927 #[test]
928 fn test_into_table_catalog() {
929 let table: TableCatalog = PbTable {
930 id: 0.into(),
931 schema_id: 0.into(),
932 database_id: 0.into(),
933 name: "test".to_owned(),
934 table_type: PbTableType::Table as i32,
935 columns: vec![
936 ColumnCatalog::row_id_column().to_protobuf(),
937 PbColumnCatalog {
938 column_desc: Some(PbColumnDesc::new(
939 DataType::from(StructType::new([
940 ("address", DataType::Varchar),
941 ("zipcode", DataType::Varchar),
942 ]))
943 .to_protobuf(),
944 "country",
945 1,
946 )),
947 is_hidden: false,
948 },
949 ],
950 pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
951 stream_key: vec![0],
952 distribution_key: vec![0],
953 optional_associated_source_id: Some(SourceId::new(233).into()),
954 append_only: false,
955 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
956 retention_seconds: Some(300),
957 fragment_id: 0.into(),
958 dml_fragment_id: None,
959 initialized_at_epoch: None,
960 value_indices: vec![0],
961 definition: "".into(),
962 read_prefix_len_hint: 0,
963 vnode_col_index: None,
964 row_id_index: None,
965 version: Some(PbTableVersion {
966 version: 0,
967 next_column_id: 2,
968 }),
969 watermark_indices: vec![],
970 handle_pk_conflict_behavior: 3,
971 dist_key_in_pk: vec![0],
972 cardinality: None,
973 created_at_epoch: None,
974 #[expect(deprecated)]
975 cleaned_by_watermark: false,
976 stream_job_status: PbStreamJobStatus::Created.into(),
977 create_type: PbCreateType::Foreground.into(),
978 description: Some("description".to_owned()),
979 #[expect(deprecated)]
980 incoming_sinks: vec![],
981 created_at_cluster_version: None,
982 initialized_at_cluster_version: None,
983 version_column_indices: Vec::new(),
984 cdc_table_id: None,
985 maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
986 webhook_info: None,
987 job_id: None,
988 engine: Some(PbEngine::Hummock as i32),
989 #[expect(deprecated)]
990 clean_watermark_index_in_pk: None,
991 clean_watermark_indices: vec![],
992
993 refreshable: false,
994 vector_index_info: None,
995 cdc_table_type: None,
996 }
997 .into();
998
999 assert_eq!(
1000 table,
1001 TableCatalog {
1002 id: TableId::new(0),
1003 schema_id: 0.into(),
1004 database_id: 0.into(),
1005 associated_source_id: Some(SourceId::new(233)),
1006 name: "test".to_owned(),
1007 table_type: TableType::Table,
1008 columns: vec![
1009 ColumnCatalog::row_id_column(),
1010 ColumnCatalog {
1011 column_desc: ColumnDesc {
1012 data_type: StructType::new(vec![
1013 ("address", DataType::Varchar),
1014 ("zipcode", DataType::Varchar)
1015 ],)
1016 .into(),
1017 column_id: ColumnId::new(1),
1018 name: "country".to_owned(),
1019 description: None,
1020 generated_or_default_column: None,
1021 additional_column: AdditionalColumn { column_type: None },
1022 version: ColumnDescVersion::LATEST,
1023 system_column: None,
1024 nullable: true,
1025 },
1026 is_hidden: false
1027 },
1028 ColumnCatalog::rw_timestamp_column(),
1029 ],
1030 stream_key: vec![0],
1031 pk: vec![ColumnOrder::new(0, OrderType::ascending())],
1032 distribution_key: vec![0],
1033 append_only: false,
1034 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
1035 retention_seconds: Some(300),
1036 fragment_id: 0.into(),
1037 dml_fragment_id: None,
1038 vnode_col_index: None,
1039 row_id_index: None,
1040 value_indices: vec![0],
1041 definition: "".into(),
1042 conflict_behavior: ConflictBehavior::NoCheck,
1043 read_prefix_len_hint: 0,
1044 version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
1045 watermark_columns: FixedBitSet::with_capacity(3),
1046 dist_key_in_pk: vec![0],
1047 cardinality: Cardinality::unknown(),
1048 created_at_epoch: None,
1049 initialized_at_epoch: None,
1050 stream_job_status: StreamJobStatus::Created,
1051 create_type: CreateType::Foreground,
1052 description: Some("description".to_owned()),
1053 created_at_cluster_version: None,
1054 initialized_at_cluster_version: None,
1055 version_column_indices: Vec::new(),
1056 cdc_table_id: None,
1057 vnode_count: VnodeCount::set(233),
1058 webhook_info: None,
1059 job_id: None,
1060 engine: Engine::Hummock,
1061 clean_watermark_index_in_pk: None,
1062 clean_watermark_indices: vec![],
1063
1064 refreshable: false,
1065 vector_index_info: None,
1066 cdc_table_type: None,
1067 }
1068 );
1069 assert_eq!(table, TableCatalog::from(table.to_prost()));
1070 }
1071}