1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21 ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, Schema,
22 StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_connector::source::cdc::external::ExternalCdcTableType;
28use risingwave_pb::catalog::table::{
29 CdcTableType as PbCdcTableType, OptionalAssociatedSourceId, PbEngine, PbTableType,
30 PbTableVersion,
31};
32use risingwave_pb::catalog::{
33 PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
34};
35use risingwave_pb::common::PbColumnOrder;
36use risingwave_pb::plan_common::DefaultColumnDesc;
37use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
38use risingwave_sqlparser::ast;
39use risingwave_sqlparser::parser::Parser;
40use thiserror_ext::AsReport as _;
41
42use super::purify::try_purify_table_source_create_sql_ast;
43use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId};
44use crate::error::{ErrorCode, Result, RwError};
45use crate::expr::ExprImpl;
46use crate::optimizer::property::Cardinality;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50#[derive(Clone, Debug, PartialEq, Eq, Hash)]
82#[cfg_attr(test, derive(Default))]
83pub struct TableCatalog {
84 pub id: TableId,
85
86 pub schema_id: SchemaId,
87
88 pub database_id: DatabaseId,
89
90 pub associated_source_id: Option<TableId>, pub name: String,
93
94 pub columns: Vec<ColumnCatalog>,
96
97 pub pk: Vec<ColumnOrder>,
99
100 pub stream_key: Vec<usize>,
102
103 pub table_type: TableType,
106
107 pub distribution_key: Vec<usize>,
109
110 pub append_only: bool,
113
114 pub cardinality: Cardinality,
116
117 pub owner: UserId,
119
120 pub retention_seconds: Option<u32>,
122
123 pub fragment_id: FragmentId,
125
126 pub dml_fragment_id: Option<FragmentId>,
128
129 pub vnode_col_index: Option<usize>,
132
133 pub row_id_index: Option<usize>,
136
137 pub value_indices: Vec<usize>,
139
140 pub definition: String,
142
143 pub conflict_behavior: ConflictBehavior,
147
148 pub version_column_indices: Vec<usize>,
149
150 pub read_prefix_len_hint: usize,
151
152 pub version: Option<TableVersion>,
154
155 pub watermark_columns: FixedBitSet,
157
158 pub dist_key_in_pk: Vec<usize>,
161
162 pub created_at_epoch: Option<Epoch>,
163
164 pub initialized_at_epoch: Option<Epoch>,
165
166 pub cleaned_by_watermark: bool,
168
169 pub create_type: CreateType,
171
172 pub stream_job_status: StreamJobStatus,
175
176 pub description: Option<String>,
178
179 pub created_at_cluster_version: Option<String>,
180
181 pub initialized_at_cluster_version: Option<String>,
182
183 pub cdc_table_id: Option<String>,
184
185 pub vnode_count: VnodeCount,
194
195 pub webhook_info: Option<PbWebhookSourceInfo>,
196
197 pub job_id: Option<TableId>,
198
199 pub engine: Engine,
200
201 pub clean_watermark_index_in_pk: Option<usize>,
202
203 pub refreshable: bool,
205
206 pub vector_index_info: Option<PbVectorIndexInfo>,
207
208 pub cdc_table_type: Option<ExternalCdcTableType>,
209}
210
211pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
212pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
213
214#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
215pub enum TableType {
216 Table,
218 MaterializedView,
220 Index,
223 VectorIndex,
224 Internal,
226}
227
228#[cfg(test)]
229impl Default for TableType {
230 fn default() -> Self {
231 Self::Table
232 }
233}
234
235impl TableType {
236 fn from_prost(prost: PbTableType) -> Self {
237 match prost {
238 PbTableType::Table => Self::Table,
239 PbTableType::MaterializedView => Self::MaterializedView,
240 PbTableType::Index => Self::Index,
241 PbTableType::Internal => Self::Internal,
242 PbTableType::VectorIndex => Self::VectorIndex,
243 PbTableType::Unspecified => unreachable!(),
244 }
245 }
246
247 pub(crate) fn to_prost(self) -> PbTableType {
248 match self {
249 Self::Table => PbTableType::Table,
250 Self::MaterializedView => PbTableType::MaterializedView,
251 Self::Index => PbTableType::Index,
252 Self::VectorIndex => PbTableType::VectorIndex,
253 Self::Internal => PbTableType::Internal,
254 }
255 }
256}
257
258#[derive(Clone, Debug, PartialEq, Eq, Hash)]
260pub struct TableVersion {
261 pub version_id: TableVersionId,
262 pub next_column_id: ColumnId,
263}
264
265impl TableVersion {
266 #[cfg(test)]
268 pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
269 use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
270
271 Self {
272 version_id: INITIAL_TABLE_VERSION_ID,
273 next_column_id: max_column_id.next(),
274 }
275 }
276
277 pub fn from_prost(prost: PbTableVersion) -> Self {
278 Self {
279 version_id: prost.version,
280 next_column_id: ColumnId::from(prost.next_column_id),
281 }
282 }
283
284 pub fn to_prost(&self) -> PbTableVersion {
285 PbTableVersion {
286 version: self.version_id,
287 next_column_id: self.next_column_id.into(),
288 }
289 }
290}
291
292impl TableCatalog {
293 pub fn create_sql_purified(&self) -> String {
298 self.create_sql_ast_purified()
299 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
300 .unwrap_or_else(|_| self.create_sql())
301 }
302
303 pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
308 if let TableType::Table = self.table_type() {
310 let base = if self.definition.is_empty() {
311 let name = ast::ObjectName(vec![self.name.as_str().into()]);
313 ast::Statement::default_create_table(name)
314 } else {
315 self.create_sql_ast_from_persisted()?
316 };
317
318 match try_purify_table_source_create_sql_ast(
319 base,
320 self.columns(),
321 self.row_id_index,
322 &self.pk_column_ids(),
323 ) {
324 Ok(stmt) => return Ok(stmt),
325 Err(e) => notice_to_user(format!(
326 "error occurred while purifying definition for table \"{}\", \
327 results may be inaccurate: {}",
328 self.name,
329 e.as_report()
330 )),
331 }
332 }
333
334 self.create_sql_ast_from_persisted()
335 }
336}
337
338impl TableCatalog {
339 pub fn id(&self) -> TableId {
341 self.id
342 }
343
344 pub fn with_id(mut self, id: TableId) -> Self {
345 self.id = id;
346 self
347 }
348
349 pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
350 self.cleaned_by_watermark = cleaned_by_watermark;
351 self
352 }
353
354 pub fn conflict_behavior(&self) -> ConflictBehavior {
355 self.conflict_behavior
356 }
357
358 pub fn table_type(&self) -> TableType {
359 self.table_type
360 }
361
362 pub fn engine(&self) -> Engine {
363 self.engine
364 }
365
366 pub fn iceberg_source_name(&self) -> Option<String> {
367 match self.engine {
368 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
369 Engine::Hummock => None,
370 }
371 }
372
373 pub fn iceberg_sink_name(&self) -> Option<String> {
374 match self.engine {
375 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
376 Engine::Hummock => None,
377 }
378 }
379
380 pub fn is_user_table(&self) -> bool {
381 self.table_type == TableType::Table
382 }
383
384 pub fn is_internal_table(&self) -> bool {
385 self.table_type == TableType::Internal
386 }
387
388 pub fn is_mview(&self) -> bool {
389 self.table_type == TableType::MaterializedView
390 }
391
392 pub fn is_index(&self) -> bool {
393 self.table_type == TableType::Index
394 }
395
396 #[must_use]
398 pub fn bad_drop_error(&self) -> RwError {
399 let msg = match self.table_type {
400 TableType::MaterializedView => {
401 "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
402 }
403 TableType::Index | TableType::VectorIndex => "Use `DROP INDEX` to drop an index.",
404 TableType::Table => "Use `DROP TABLE` to drop a table.",
405 TableType::Internal => "Internal tables cannot be dropped.",
406 };
407
408 ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
409 }
410
411 #[must_use]
413 pub fn associated_source_id(&self) -> Option<TableId> {
414 self.associated_source_id
415 }
416
417 pub fn has_associated_source(&self) -> bool {
418 self.associated_source_id.is_some()
419 }
420
421 pub fn columns(&self) -> &[ColumnCatalog] {
423 &self.columns
424 }
425
426 pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
427 self.columns
428 .iter()
429 .filter(|c| !c.is_rw_timestamp_column())
430 .cloned()
431 .collect()
432 }
433
434 pub fn pk(&self) -> &[ColumnOrder] {
436 self.pk.as_ref()
437 }
438
439 pub fn pk_column_ids(&self) -> Vec<ColumnId> {
441 self.pk
442 .iter()
443 .map(|x| self.columns[x.column_index].column_id())
444 .collect()
445 }
446
447 pub fn pk_column_names(&self) -> Vec<&str> {
449 self.pk
450 .iter()
451 .map(|x| self.columns[x.column_index].name())
452 .collect()
453 }
454
455 pub fn table_desc(&self) -> TableDesc {
460 use risingwave_common::catalog::TableOption;
461
462 let table_options = TableOption::new(self.retention_seconds);
463
464 TableDesc {
465 table_id: self.id,
466 pk: self.pk.clone(),
467 stream_key: self.stream_key.clone(),
468 columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
469 distribution_key: self.distribution_key.clone(),
470 append_only: self.append_only,
471 retention_seconds: table_options.retention_seconds,
472 value_indices: self.value_indices.clone(),
473 read_prefix_len_hint: self.read_prefix_len_hint,
474 watermark_columns: self.watermark_columns.clone(),
475 versioned: self.version.is_some(),
476 vnode_col_index: self.vnode_col_index,
477 vnode_count: self.vnode_count(),
478 }
479 }
480
481 pub fn name(&self) -> &str {
483 self.name.as_ref()
484 }
485
486 pub fn distribution_key(&self) -> &[usize] {
487 self.distribution_key.as_ref()
488 }
489
490 pub fn to_internal_table_prost(&self) -> PbTable {
491 self.to_prost()
492 }
493
494 pub fn create_sql(&self) -> String {
498 self.create_sql_ast()
499 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
500 .unwrap_or_else(|_| self.definition.clone())
501 }
502
503 pub fn create_sql_ast(&self) -> Result<ast::Statement> {
511 if let TableType::Table = self.table_type()
512 && self.definition.is_empty()
513 {
514 self.create_sql_ast_purified()
516 } else {
517 self.create_sql_ast_from_persisted()
519 }
520 }
521
522 fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
523 Ok(Parser::parse_exactly_one(&self.definition)?)
524 }
525
526 pub fn version(&self) -> Option<&TableVersion> {
528 self.version.as_ref()
529 }
530
531 pub fn version_id(&self) -> Option<TableVersionId> {
533 self.version().map(|v| v.version_id)
534 }
535
536 pub fn vnode_count(&self) -> usize {
540 self.vnode_count.value()
541 }
542
543 pub fn to_prost(&self) -> PbTable {
544 PbTable {
545 id: self.id.table_id,
546 schema_id: self.schema_id,
547 database_id: self.database_id,
548 name: self.name.clone(),
549 columns: self
551 .columns_without_rw_timestamp()
552 .iter()
553 .map(|c| c.to_protobuf())
554 .collect(),
555 pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
556 stream_key: self.stream_key.iter().map(|x| *x as _).collect(),
557 optional_associated_source_id: self
558 .associated_source_id
559 .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
560 table_type: self.table_type.to_prost() as i32,
561 distribution_key: self
562 .distribution_key
563 .iter()
564 .map(|k| *k as i32)
565 .collect_vec(),
566 append_only: self.append_only,
567 owner: self.owner,
568 fragment_id: self.fragment_id,
569 dml_fragment_id: self.dml_fragment_id,
570 vnode_col_index: self.vnode_col_index.map(|i| i as _),
571 row_id_index: self.row_id_index.map(|i| i as _),
572 value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
573 definition: self.definition.clone(),
574 read_prefix_len_hint: self.read_prefix_len_hint as u32,
575 version: self.version.as_ref().map(TableVersion::to_prost),
576 watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
577 dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
578 handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
579 version_column_indices: self
580 .version_column_indices
581 .iter()
582 .map(|&idx| idx as u32)
583 .collect(),
584 cardinality: Some(self.cardinality.to_protobuf()),
585 initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
586 created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
587 cleaned_by_watermark: self.cleaned_by_watermark,
588 stream_job_status: self.stream_job_status.to_proto().into(),
589 create_type: self.create_type.to_proto().into(),
590 description: self.description.clone(),
591 #[expect(deprecated)]
592 incoming_sinks: vec![],
593 created_at_cluster_version: self.created_at_cluster_version.clone(),
594 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
595 retention_seconds: self.retention_seconds,
596 cdc_table_id: self.cdc_table_id.clone(),
597 maybe_vnode_count: self.vnode_count.to_protobuf(),
598 webhook_info: self.webhook_info.clone(),
599 job_id: self.job_id.map(|id| id.table_id),
600 engine: Some(self.engine.to_protobuf().into()),
601 clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
602 refreshable: self.refreshable,
603 vector_index_info: self.vector_index_info,
604 cdc_table_type: self
605 .cdc_table_type
606 .clone()
607 .map(|t| PbCdcTableType::from(t) as i32),
608 refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
609 }
610 }
611
612 pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
614 self.columns
615 .iter()
616 .filter(|c| !c.is_hidden() && !c.is_generated())
617 }
618
619 pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
620 self.columns
621 .iter()
622 .filter(|c| c.is_generated())
623 .map(|c| c.name())
624 }
625
626 pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
627 self.columns
628 .iter()
629 .enumerate()
630 .filter(|(_, c)| c.is_generated())
631 .map(|(i, _)| i)
632 }
633
634 pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
635 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
636 .columns[col_idx]
637 .column_desc
638 .generated_or_default_column
639 .as_ref()
640 {
641 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
642 .expect("expr in default columns corrupted")
643 } else {
644 ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
645 }
646 }
647
648 pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
649 columns
650 .iter()
651 .map(|c| {
652 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
653 expr,
654 ..
655 })) = c.column_desc.generated_or_default_column.as_ref()
656 {
657 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
658 .expect("expr in default columns corrupted")
659 } else {
660 ExprImpl::literal_null(c.data_type().clone())
661 }
662 })
663 .collect()
664 }
665
666 pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
667 self.columns.iter().enumerate().filter_map(|(i, c)| {
668 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
669 expr, ..
670 })) = c.column_desc.generated_or_default_column.as_ref()
671 {
672 Some((
673 i,
674 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
675 .expect("expr in default columns corrupted"),
676 ))
677 } else {
678 None
679 }
680 })
681 }
682
683 pub fn has_generated_column(&self) -> bool {
684 self.columns.iter().any(|c| c.is_generated())
685 }
686
687 pub fn has_rw_timestamp_column(&self) -> bool {
688 self.columns.iter().any(|c| c.is_rw_timestamp_column())
689 }
690
691 pub fn column_schema(&self) -> Schema {
692 Schema::new(
693 self.columns
694 .iter()
695 .map(|c| Field::from(&c.column_desc))
696 .collect(),
697 )
698 }
699
700 pub fn is_created(&self) -> bool {
701 self.stream_job_status == StreamJobStatus::Created
702 }
703
704 pub fn is_iceberg_engine_table(&self) -> bool {
705 self.engine == Engine::Iceberg
706 }
707
708 pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
709 self.pk.iter().map(|col| col.column_index)
710 }
711
712 pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
713 ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
714 }
715
716 pub fn order_column_ids(&self) -> Vec<ColumnId> {
717 self.pk
718 .iter()
719 .map(|col| self.columns[col.column_index].column_desc.column_id)
720 .collect()
721 }
722
723 pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
724 self.pk.iter().map(|x| x.to_protobuf()).collect()
726 }
727}
728
729impl From<PbTable> for TableCatalog {
730 fn from(tb: PbTable) -> Self {
731 let id = tb.id;
732 let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
733 let tb_engine = tb
734 .get_engine()
735 .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
736 .unwrap_or(PbEngine::Hummock);
737 let table_type = tb.get_table_type().unwrap();
738 let stream_job_status = tb
739 .get_stream_job_status()
740 .unwrap_or(PbStreamJobStatus::Created);
741 let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
742 let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
743 OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
744 });
745 let name = tb.name.clone();
746
747 let vnode_count = tb.vnode_count_inner();
748 if let VnodeCount::Placeholder = vnode_count {
749 assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
752 }
753
754 let mut col_names = HashSet::new();
755 let mut col_index: HashMap<i32, usize> = HashMap::new();
756
757 let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
758 let version_column_indices: Vec<usize> = tb
759 .version_column_indices
760 .iter()
761 .map(|&idx| idx as usize)
762 .collect();
763 let mut columns: Vec<ColumnCatalog> =
764 tb.columns.into_iter().map(ColumnCatalog::from).collect();
765 if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
766 columns.push(ColumnCatalog::rw_timestamp_column());
768 }
769 for (idx, catalog) in columns.clone().into_iter().enumerate() {
770 let col_name = catalog.name();
771 if !col_names.insert(col_name.to_owned()) {
772 panic!("duplicated column name {} in table {} ", col_name, tb.name)
773 }
774
775 let col_id = catalog.column_desc.column_id.get_id();
776 col_index.insert(col_id, idx);
777 }
778
779 let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
780 let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
781 for idx in &tb.watermark_indices {
782 watermark_columns.insert(*idx as _);
783 }
784 let engine = Engine::from_protobuf(&tb_engine);
785
786 Self {
787 id: id.into(),
788 schema_id: tb.schema_id,
789 database_id: tb.database_id,
790 associated_source_id: associated_source_id.map(Into::into),
791 name,
792 pk,
793 columns,
794 table_type: TableType::from_prost(table_type),
795 distribution_key: tb
796 .distribution_key
797 .iter()
798 .map(|k| *k as usize)
799 .collect_vec(),
800 stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
801 append_only: tb.append_only,
802 owner: tb.owner,
803 fragment_id: tb.fragment_id,
804 dml_fragment_id: tb.dml_fragment_id,
805 vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
806 row_id_index: tb.row_id_index.map(|x| x as usize),
807 value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
808 definition: tb.definition,
809 conflict_behavior,
810 version_column_indices,
811 read_prefix_len_hint: tb.read_prefix_len_hint as usize,
812 version: tb.version.map(TableVersion::from_prost),
813 watermark_columns,
814 dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
815 cardinality: tb
816 .cardinality
817 .map(|c| Cardinality::from_protobuf(&c))
818 .unwrap_or_else(Cardinality::unknown),
819 created_at_epoch: tb.created_at_epoch.map(Epoch::from),
820 initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
821 cleaned_by_watermark: tb.cleaned_by_watermark,
822 create_type: CreateType::from_proto(create_type),
823 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
824 description: tb.description,
825 created_at_cluster_version: tb.created_at_cluster_version.clone(),
826 initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
827 retention_seconds: tb.retention_seconds,
828 cdc_table_id: tb.cdc_table_id,
829 vnode_count,
830 webhook_info: tb.webhook_info,
831 job_id: tb.job_id.map(TableId::from),
832 engine,
833 clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
834
835 refreshable: tb.refreshable,
836 vector_index_info: tb.vector_index_info,
837 cdc_table_type: tb
838 .cdc_table_type
839 .and_then(|t| PbCdcTableType::try_from(t).ok())
840 .map(ExternalCdcTableType::from),
841 }
842 }
843}
844
845impl From<&PbTable> for TableCatalog {
846 fn from(tb: &PbTable) -> Self {
847 tb.clone().into()
848 }
849}
850
851impl OwnedByUserCatalog for TableCatalog {
852 fn owner(&self) -> UserId {
853 self.owner
854 }
855}
856
857#[cfg(test)]
858mod tests {
859 use risingwave_common::catalog::{ColumnDesc, ColumnId};
860 use risingwave_common::test_prelude::*;
861 use risingwave_common::types::*;
862 use risingwave_common::util::sort_util::OrderType;
863 use risingwave_pb::catalog::table::PbEngine;
864 use risingwave_pb::plan_common::{
865 AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
866 };
867
868 use super::*;
869
870 #[test]
871 fn test_into_table_catalog() {
872 let table: TableCatalog = PbTable {
873 id: 0,
874 schema_id: 0,
875 database_id: 0,
876 name: "test".to_owned(),
877 table_type: PbTableType::Table as i32,
878 columns: vec![
879 ColumnCatalog::row_id_column().to_protobuf(),
880 PbColumnCatalog {
881 column_desc: Some(PbColumnDesc::new(
882 DataType::from(StructType::new([
883 ("address", DataType::Varchar),
884 ("zipcode", DataType::Varchar),
885 ]))
886 .to_protobuf(),
887 "country",
888 1,
889 )),
890 is_hidden: false,
891 },
892 ],
893 pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
894 stream_key: vec![0],
895 distribution_key: vec![0],
896 optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233)
897 .into(),
898 append_only: false,
899 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
900 retention_seconds: Some(300),
901 fragment_id: 0,
902 dml_fragment_id: None,
903 initialized_at_epoch: None,
904 value_indices: vec![0],
905 definition: "".into(),
906 read_prefix_len_hint: 0,
907 vnode_col_index: None,
908 row_id_index: None,
909 version: Some(PbTableVersion {
910 version: 0,
911 next_column_id: 2,
912 }),
913 watermark_indices: vec![],
914 handle_pk_conflict_behavior: 3,
915 dist_key_in_pk: vec![0],
916 cardinality: None,
917 created_at_epoch: None,
918 cleaned_by_watermark: false,
919 stream_job_status: PbStreamJobStatus::Created.into(),
920 create_type: PbCreateType::Foreground.into(),
921 description: Some("description".to_owned()),
922 #[expect(deprecated)]
923 incoming_sinks: vec![],
924 created_at_cluster_version: None,
925 initialized_at_cluster_version: None,
926 version_column_indices: Vec::new(),
927 cdc_table_id: None,
928 maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
929 webhook_info: None,
930 job_id: None,
931 engine: Some(PbEngine::Hummock as i32),
932 clean_watermark_index_in_pk: None,
933
934 refreshable: false,
935 vector_index_info: None,
936 cdc_table_type: None,
937 refresh_state: Some(risingwave_pb::catalog::RefreshState::Idle as i32),
938 }
939 .into();
940
941 assert_eq!(
942 table,
943 TableCatalog {
944 id: TableId::new(0),
945 schema_id: 0,
946 database_id: 0,
947 associated_source_id: Some(TableId::new(233)),
948 name: "test".to_owned(),
949 table_type: TableType::Table,
950 columns: vec![
951 ColumnCatalog::row_id_column(),
952 ColumnCatalog {
953 column_desc: ColumnDesc {
954 data_type: StructType::new(vec![
955 ("address", DataType::Varchar),
956 ("zipcode", DataType::Varchar)
957 ],)
958 .into(),
959 column_id: ColumnId::new(1),
960 name: "country".to_owned(),
961 description: None,
962 generated_or_default_column: None,
963 additional_column: AdditionalColumn { column_type: None },
964 version: ColumnDescVersion::LATEST,
965 system_column: None,
966 nullable: true,
967 },
968 is_hidden: false
969 },
970 ColumnCatalog::rw_timestamp_column(),
971 ],
972 stream_key: vec![0],
973 pk: vec![ColumnOrder::new(0, OrderType::ascending())],
974 distribution_key: vec![0],
975 append_only: false,
976 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
977 retention_seconds: Some(300),
978 fragment_id: 0,
979 dml_fragment_id: None,
980 vnode_col_index: None,
981 row_id_index: None,
982 value_indices: vec![0],
983 definition: "".into(),
984 conflict_behavior: ConflictBehavior::NoCheck,
985 read_prefix_len_hint: 0,
986 version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
987 watermark_columns: FixedBitSet::with_capacity(3),
988 dist_key_in_pk: vec![0],
989 cardinality: Cardinality::unknown(),
990 created_at_epoch: None,
991 initialized_at_epoch: None,
992 cleaned_by_watermark: false,
993 stream_job_status: StreamJobStatus::Created,
994 create_type: CreateType::Foreground,
995 description: Some("description".to_owned()),
996 created_at_cluster_version: None,
997 initialized_at_cluster_version: None,
998 version_column_indices: Vec::new(),
999 cdc_table_id: None,
1000 vnode_count: VnodeCount::set(233),
1001 webhook_info: None,
1002 job_id: None,
1003 engine: Engine::Hummock,
1004 clean_watermark_index_in_pk: None,
1005
1006 refreshable: false,
1007 vector_index_info: None,
1008 cdc_table_type: None,
1009 }
1010 );
1011 assert_eq!(table, TableCatalog::from(table.to_prost()));
1012 }
1013}