1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21 ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, Schema,
22 StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_connector::source::cdc::external::ExternalCdcTableType;
28use risingwave_pb::catalog::table::{
29 CdcTableType as PbCdcTableType, OptionalAssociatedSourceId, PbEngine, PbTableType,
30 PbTableVersion,
31};
32use risingwave_pb::catalog::{
33 PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
34};
35use risingwave_pb::common::PbColumnOrder;
36use risingwave_pb::plan_common::DefaultColumnDesc;
37use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
38use risingwave_sqlparser::ast;
39use risingwave_sqlparser::parser::Parser;
40use thiserror_ext::AsReport as _;
41
42use super::purify::try_purify_table_source_create_sql_ast;
43use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId};
44use crate::error::{ErrorCode, Result, RwError};
45use crate::expr::ExprImpl;
46use crate::optimizer::property::Cardinality;
47use crate::session::current::notice_to_user;
48use crate::user::UserId;
49
50#[derive(Clone, Debug, PartialEq, Eq, Hash)]
82#[cfg_attr(test, derive(Default))]
83pub struct TableCatalog {
84 pub id: TableId,
85
86 pub schema_id: SchemaId,
87
88 pub database_id: DatabaseId,
89
90 pub associated_source_id: Option<TableId>, pub name: String,
93
94 pub columns: Vec<ColumnCatalog>,
96
97 pub pk: Vec<ColumnOrder>,
99
100 pub stream_key: Vec<usize>,
102
103 pub table_type: TableType,
106
107 pub distribution_key: Vec<usize>,
109
110 pub append_only: bool,
113
114 pub cardinality: Cardinality,
116
117 pub owner: UserId,
119
120 pub retention_seconds: Option<u32>,
122
123 pub fragment_id: FragmentId,
125
126 pub dml_fragment_id: Option<FragmentId>,
128
129 pub vnode_col_index: Option<usize>,
132
133 pub row_id_index: Option<usize>,
136
137 pub value_indices: Vec<usize>,
139
140 pub definition: String,
142
143 pub conflict_behavior: ConflictBehavior,
147
148 pub version_column_indices: Vec<usize>,
149
150 pub read_prefix_len_hint: usize,
151
152 pub version: Option<TableVersion>,
154
155 pub watermark_columns: FixedBitSet,
157
158 pub dist_key_in_pk: Vec<usize>,
161
162 pub created_at_epoch: Option<Epoch>,
163
164 pub initialized_at_epoch: Option<Epoch>,
165
166 pub cleaned_by_watermark: bool,
168
169 pub create_type: CreateType,
171
172 pub stream_job_status: StreamJobStatus,
175
176 pub description: Option<String>,
178
179 pub created_at_cluster_version: Option<String>,
180
181 pub initialized_at_cluster_version: Option<String>,
182
183 pub cdc_table_id: Option<String>,
184
185 pub vnode_count: VnodeCount,
194
195 pub webhook_info: Option<PbWebhookSourceInfo>,
196
197 pub job_id: Option<TableId>,
198
199 pub engine: Engine,
200
201 pub clean_watermark_index_in_pk: Option<usize>,
202
203 pub refreshable: bool,
205
206 pub vector_index_info: Option<PbVectorIndexInfo>,
207
208 pub cdc_table_type: Option<ExternalCdcTableType>,
209}
210
211pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
212pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
213
214#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
215pub enum TableType {
216 Table,
218 MaterializedView,
220 Index,
223 VectorIndex,
224 Internal,
226}
227
228#[cfg(test)]
229impl Default for TableType {
230 fn default() -> Self {
231 Self::Table
232 }
233}
234
235impl TableType {
236 fn from_prost(prost: PbTableType) -> Self {
237 match prost {
238 PbTableType::Table => Self::Table,
239 PbTableType::MaterializedView => Self::MaterializedView,
240 PbTableType::Index => Self::Index,
241 PbTableType::Internal => Self::Internal,
242 PbTableType::VectorIndex => Self::VectorIndex,
243 PbTableType::Unspecified => unreachable!(),
244 }
245 }
246
247 pub(crate) fn to_prost(self) -> PbTableType {
248 match self {
249 Self::Table => PbTableType::Table,
250 Self::MaterializedView => PbTableType::MaterializedView,
251 Self::Index => PbTableType::Index,
252 Self::VectorIndex => PbTableType::VectorIndex,
253 Self::Internal => PbTableType::Internal,
254 }
255 }
256}
257
258#[derive(Clone, Debug, PartialEq, Eq, Hash)]
260pub struct TableVersion {
261 pub version_id: TableVersionId,
262 pub next_column_id: ColumnId,
263}
264
265impl TableVersion {
266 #[cfg(test)]
268 pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
269 use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
270
271 Self {
272 version_id: INITIAL_TABLE_VERSION_ID,
273 next_column_id: max_column_id.next(),
274 }
275 }
276
277 pub fn from_prost(prost: PbTableVersion) -> Self {
278 Self {
279 version_id: prost.version,
280 next_column_id: ColumnId::from(prost.next_column_id),
281 }
282 }
283
284 pub fn to_prost(&self) -> PbTableVersion {
285 PbTableVersion {
286 version: self.version_id,
287 next_column_id: self.next_column_id.into(),
288 }
289 }
290}
291
292impl TableCatalog {
293 pub fn create_sql_purified(&self) -> String {
298 self.create_sql_ast_purified()
299 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
300 .unwrap_or_else(|_| self.create_sql())
301 }
302
303 pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
308 if let TableType::Table = self.table_type() {
310 let base = if self.definition.is_empty() {
311 let name = ast::ObjectName(vec![self.name.as_str().into()]);
313 ast::Statement::default_create_table(name)
314 } else {
315 self.create_sql_ast_from_persisted()?
316 };
317
318 match try_purify_table_source_create_sql_ast(
319 base,
320 self.columns(),
321 self.row_id_index,
322 &self.pk_column_ids(),
323 ) {
324 Ok(stmt) => return Ok(stmt),
325 Err(e) => notice_to_user(format!(
326 "error occurred while purifying definition for table \"{}\", \
327 results may be inaccurate: {}",
328 self.name,
329 e.as_report()
330 )),
331 }
332 }
333
334 self.create_sql_ast_from_persisted()
335 }
336}
337
338impl TableCatalog {
339 pub fn id(&self) -> TableId {
341 self.id
342 }
343
344 pub fn with_id(mut self, id: TableId) -> Self {
345 self.id = id;
346 self
347 }
348
349 pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
350 self.cleaned_by_watermark = cleaned_by_watermark;
351 self
352 }
353
354 pub fn conflict_behavior(&self) -> ConflictBehavior {
355 self.conflict_behavior
356 }
357
358 pub fn table_type(&self) -> TableType {
359 self.table_type
360 }
361
362 pub fn engine(&self) -> Engine {
363 self.engine
364 }
365
366 pub fn iceberg_source_name(&self) -> Option<String> {
367 match self.engine {
368 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
369 Engine::Hummock => None,
370 }
371 }
372
373 pub fn iceberg_sink_name(&self) -> Option<String> {
374 match self.engine {
375 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
376 Engine::Hummock => None,
377 }
378 }
379
380 pub fn is_user_table(&self) -> bool {
381 self.table_type == TableType::Table
382 }
383
384 pub fn is_internal_table(&self) -> bool {
385 self.table_type == TableType::Internal
386 }
387
388 pub fn is_mview(&self) -> bool {
389 self.table_type == TableType::MaterializedView
390 }
391
392 pub fn is_index(&self) -> bool {
393 self.table_type == TableType::Index
394 }
395
396 #[must_use]
398 pub fn bad_drop_error(&self) -> RwError {
399 let msg = match self.table_type {
400 TableType::MaterializedView => {
401 "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
402 }
403 TableType::Index | TableType::VectorIndex => "Use `DROP INDEX` to drop an index.",
404 TableType::Table => "Use `DROP TABLE` to drop a table.",
405 TableType::Internal => "Internal tables cannot be dropped.",
406 };
407
408 ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
409 }
410
411 #[must_use]
413 pub fn associated_source_id(&self) -> Option<TableId> {
414 self.associated_source_id
415 }
416
417 pub fn has_associated_source(&self) -> bool {
418 self.associated_source_id.is_some()
419 }
420
421 pub fn columns(&self) -> &[ColumnCatalog] {
423 &self.columns
424 }
425
426 pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
427 self.columns
428 .iter()
429 .filter(|c| !c.is_rw_timestamp_column())
430 .cloned()
431 .collect()
432 }
433
434 pub fn pk(&self) -> &[ColumnOrder] {
436 self.pk.as_ref()
437 }
438
439 pub fn pk_column_ids(&self) -> Vec<ColumnId> {
441 self.pk
442 .iter()
443 .map(|x| self.columns[x.column_index].column_id())
444 .collect()
445 }
446
447 pub fn pk_column_names(&self) -> Vec<&str> {
449 self.pk
450 .iter()
451 .map(|x| self.columns[x.column_index].name())
452 .collect()
453 }
454
455 pub fn table_desc(&self) -> TableDesc {
460 use risingwave_common::catalog::TableOption;
461
462 let table_options = TableOption::new(self.retention_seconds);
463
464 TableDesc {
465 table_id: self.id,
466 pk: self.pk.clone(),
467 stream_key: self.stream_key.clone(),
468 columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
469 distribution_key: self.distribution_key.clone(),
470 append_only: self.append_only,
471 retention_seconds: table_options.retention_seconds,
472 value_indices: self.value_indices.clone(),
473 read_prefix_len_hint: self.read_prefix_len_hint,
474 watermark_columns: self.watermark_columns.clone(),
475 versioned: self.version.is_some(),
476 vnode_col_index: self.vnode_col_index,
477 vnode_count: self.vnode_count(),
478 }
479 }
480
481 pub fn name(&self) -> &str {
483 self.name.as_ref()
484 }
485
486 pub fn distribution_key(&self) -> &[usize] {
487 self.distribution_key.as_ref()
488 }
489
490 pub fn to_internal_table_prost(&self) -> PbTable {
491 self.to_prost()
492 }
493
494 pub fn create_sql(&self) -> String {
498 self.create_sql_ast()
499 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
500 .unwrap_or_else(|_| self.definition.clone())
501 }
502
503 pub fn create_sql_ast(&self) -> Result<ast::Statement> {
511 if let TableType::Table = self.table_type()
512 && self.definition.is_empty()
513 {
514 self.create_sql_ast_purified()
516 } else {
517 self.create_sql_ast_from_persisted()
519 }
520 }
521
522 fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
523 Ok(Parser::parse_exactly_one(&self.definition)?)
524 }
525
526 pub fn version(&self) -> Option<&TableVersion> {
528 self.version.as_ref()
529 }
530
531 pub fn version_id(&self) -> Option<TableVersionId> {
533 self.version().map(|v| v.version_id)
534 }
535
536 pub fn vnode_count(&self) -> usize {
540 self.vnode_count.value()
541 }
542
543 pub fn to_prost(&self) -> PbTable {
544 PbTable {
545 id: self.id.table_id,
546 schema_id: self.schema_id,
547 database_id: self.database_id,
548 name: self.name.clone(),
549 columns: self
551 .columns_without_rw_timestamp()
552 .iter()
553 .map(|c| c.to_protobuf())
554 .collect(),
555 pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
556 stream_key: self.stream_key.iter().map(|x| *x as _).collect(),
557 optional_associated_source_id: self
558 .associated_source_id
559 .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
560 table_type: self.table_type.to_prost() as i32,
561 distribution_key: self
562 .distribution_key
563 .iter()
564 .map(|k| *k as i32)
565 .collect_vec(),
566 append_only: self.append_only,
567 owner: self.owner,
568 fragment_id: self.fragment_id,
569 dml_fragment_id: self.dml_fragment_id,
570 vnode_col_index: self.vnode_col_index.map(|i| i as _),
571 row_id_index: self.row_id_index.map(|i| i as _),
572 value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
573 definition: self.definition.clone(),
574 read_prefix_len_hint: self.read_prefix_len_hint as u32,
575 version: self.version.as_ref().map(TableVersion::to_prost),
576 watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
577 dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
578 handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
579 version_column_indices: self
580 .version_column_indices
581 .iter()
582 .map(|&idx| idx as u32)
583 .collect(),
584 cardinality: Some(self.cardinality.to_protobuf()),
585 initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
586 created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
587 cleaned_by_watermark: self.cleaned_by_watermark,
588 stream_job_status: self.stream_job_status.to_proto().into(),
589 create_type: self.create_type.to_proto().into(),
590 description: self.description.clone(),
591 #[expect(deprecated)]
592 incoming_sinks: vec![],
593 created_at_cluster_version: self.created_at_cluster_version.clone(),
594 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
595 retention_seconds: self.retention_seconds,
596 cdc_table_id: self.cdc_table_id.clone(),
597 maybe_vnode_count: self.vnode_count.to_protobuf(),
598 webhook_info: self.webhook_info.clone(),
599 job_id: self.job_id.map(|id| id.table_id),
600 engine: Some(self.engine.to_protobuf().into()),
601 clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
602 refreshable: self.refreshable,
603 vector_index_info: self.vector_index_info,
604 cdc_table_type: self
605 .cdc_table_type
606 .clone()
607 .map(|t| PbCdcTableType::from(t) as i32),
608 }
609 }
610
611 pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
613 self.columns
614 .iter()
615 .filter(|c| !c.is_hidden() && !c.is_generated())
616 }
617
618 pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
619 self.columns
620 .iter()
621 .filter(|c| c.is_generated())
622 .map(|c| c.name())
623 }
624
625 pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
626 self.columns
627 .iter()
628 .enumerate()
629 .filter(|(_, c)| c.is_generated())
630 .map(|(i, _)| i)
631 }
632
633 pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
634 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
635 .columns[col_idx]
636 .column_desc
637 .generated_or_default_column
638 .as_ref()
639 {
640 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
641 .expect("expr in default columns corrupted")
642 } else {
643 ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
644 }
645 }
646
647 pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
648 columns
649 .iter()
650 .map(|c| {
651 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
652 expr,
653 ..
654 })) = c.column_desc.generated_or_default_column.as_ref()
655 {
656 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
657 .expect("expr in default columns corrupted")
658 } else {
659 ExprImpl::literal_null(c.data_type().clone())
660 }
661 })
662 .collect()
663 }
664
665 pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
666 self.columns.iter().enumerate().filter_map(|(i, c)| {
667 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
668 expr, ..
669 })) = c.column_desc.generated_or_default_column.as_ref()
670 {
671 Some((
672 i,
673 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
674 .expect("expr in default columns corrupted"),
675 ))
676 } else {
677 None
678 }
679 })
680 }
681
682 pub fn has_generated_column(&self) -> bool {
683 self.columns.iter().any(|c| c.is_generated())
684 }
685
686 pub fn has_rw_timestamp_column(&self) -> bool {
687 self.columns.iter().any(|c| c.is_rw_timestamp_column())
688 }
689
690 pub fn column_schema(&self) -> Schema {
691 Schema::new(
692 self.columns
693 .iter()
694 .map(|c| Field::from(&c.column_desc))
695 .collect(),
696 )
697 }
698
699 pub fn is_created(&self) -> bool {
700 self.stream_job_status == StreamJobStatus::Created
701 }
702
703 pub fn is_iceberg_engine_table(&self) -> bool {
704 self.engine == Engine::Iceberg
705 }
706
707 pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
708 self.pk.iter().map(|col| col.column_index)
709 }
710
711 pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
712 ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
713 }
714
715 pub fn order_column_ids(&self) -> Vec<ColumnId> {
716 self.pk
717 .iter()
718 .map(|col| self.columns[col.column_index].column_desc.column_id)
719 .collect()
720 }
721
722 pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
723 self.pk.iter().map(|x| x.to_protobuf()).collect()
725 }
726}
727
728impl From<PbTable> for TableCatalog {
729 fn from(tb: PbTable) -> Self {
730 let id = tb.id;
731 let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
732 let tb_engine = tb
733 .get_engine()
734 .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
735 .unwrap_or(PbEngine::Hummock);
736 let table_type = tb.get_table_type().unwrap();
737 let stream_job_status = tb
738 .get_stream_job_status()
739 .unwrap_or(PbStreamJobStatus::Created);
740 let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
741 let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
742 OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
743 });
744 let name = tb.name.clone();
745
746 let vnode_count = tb.vnode_count_inner();
747 if let VnodeCount::Placeholder = vnode_count {
748 assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
751 }
752
753 let mut col_names = HashSet::new();
754 let mut col_index: HashMap<i32, usize> = HashMap::new();
755
756 let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
757 let version_column_indices: Vec<usize> = tb
758 .version_column_indices
759 .iter()
760 .map(|&idx| idx as usize)
761 .collect();
762 let mut columns: Vec<ColumnCatalog> =
763 tb.columns.into_iter().map(ColumnCatalog::from).collect();
764 if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
765 columns.push(ColumnCatalog::rw_timestamp_column());
767 }
768 for (idx, catalog) in columns.clone().into_iter().enumerate() {
769 let col_name = catalog.name();
770 if !col_names.insert(col_name.to_owned()) {
771 panic!("duplicated column name {} in table {} ", col_name, tb.name)
772 }
773
774 let col_id = catalog.column_desc.column_id.get_id();
775 col_index.insert(col_id, idx);
776 }
777
778 let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
779 let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
780 for idx in &tb.watermark_indices {
781 watermark_columns.insert(*idx as _);
782 }
783 let engine = Engine::from_protobuf(&tb_engine);
784
785 Self {
786 id: id.into(),
787 schema_id: tb.schema_id,
788 database_id: tb.database_id,
789 associated_source_id: associated_source_id.map(Into::into),
790 name,
791 pk,
792 columns,
793 table_type: TableType::from_prost(table_type),
794 distribution_key: tb
795 .distribution_key
796 .iter()
797 .map(|k| *k as usize)
798 .collect_vec(),
799 stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
800 append_only: tb.append_only,
801 owner: tb.owner,
802 fragment_id: tb.fragment_id,
803 dml_fragment_id: tb.dml_fragment_id,
804 vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
805 row_id_index: tb.row_id_index.map(|x| x as usize),
806 value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
807 definition: tb.definition,
808 conflict_behavior,
809 version_column_indices,
810 read_prefix_len_hint: tb.read_prefix_len_hint as usize,
811 version: tb.version.map(TableVersion::from_prost),
812 watermark_columns,
813 dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
814 cardinality: tb
815 .cardinality
816 .map(|c| Cardinality::from_protobuf(&c))
817 .unwrap_or_else(Cardinality::unknown),
818 created_at_epoch: tb.created_at_epoch.map(Epoch::from),
819 initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
820 cleaned_by_watermark: tb.cleaned_by_watermark,
821 create_type: CreateType::from_proto(create_type),
822 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
823 description: tb.description,
824 created_at_cluster_version: tb.created_at_cluster_version.clone(),
825 initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
826 retention_seconds: tb.retention_seconds,
827 cdc_table_id: tb.cdc_table_id,
828 vnode_count,
829 webhook_info: tb.webhook_info,
830 job_id: tb.job_id.map(TableId::from),
831 engine,
832 clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
833
834 refreshable: tb.refreshable,
835 vector_index_info: tb.vector_index_info,
836 cdc_table_type: tb
837 .cdc_table_type
838 .and_then(|t| PbCdcTableType::try_from(t).ok())
839 .map(ExternalCdcTableType::from),
840 }
841 }
842}
843
844impl From<&PbTable> for TableCatalog {
845 fn from(tb: &PbTable) -> Self {
846 tb.clone().into()
847 }
848}
849
850impl OwnedByUserCatalog for TableCatalog {
851 fn owner(&self) -> UserId {
852 self.owner
853 }
854}
855
856#[cfg(test)]
857mod tests {
858 use risingwave_common::catalog::{ColumnDesc, ColumnId};
859 use risingwave_common::test_prelude::*;
860 use risingwave_common::types::*;
861 use risingwave_common::util::sort_util::OrderType;
862 use risingwave_pb::catalog::table::PbEngine;
863 use risingwave_pb::plan_common::{
864 AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
865 };
866
867 use super::*;
868
869 #[test]
870 fn test_into_table_catalog() {
871 let table: TableCatalog = PbTable {
872 id: 0,
873 schema_id: 0,
874 database_id: 0,
875 name: "test".to_owned(),
876 table_type: PbTableType::Table as i32,
877 columns: vec![
878 ColumnCatalog::row_id_column().to_protobuf(),
879 PbColumnCatalog {
880 column_desc: Some(PbColumnDesc::new(
881 DataType::from(StructType::new([
882 ("address", DataType::Varchar),
883 ("zipcode", DataType::Varchar),
884 ]))
885 .to_protobuf(),
886 "country",
887 1,
888 )),
889 is_hidden: false,
890 },
891 ],
892 pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
893 stream_key: vec![0],
894 distribution_key: vec![0],
895 optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233)
896 .into(),
897 append_only: false,
898 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
899 retention_seconds: Some(300),
900 fragment_id: 0,
901 dml_fragment_id: None,
902 initialized_at_epoch: None,
903 value_indices: vec![0],
904 definition: "".into(),
905 read_prefix_len_hint: 0,
906 vnode_col_index: None,
907 row_id_index: None,
908 version: Some(PbTableVersion {
909 version: 0,
910 next_column_id: 2,
911 }),
912 watermark_indices: vec![],
913 handle_pk_conflict_behavior: 3,
914 dist_key_in_pk: vec![0],
915 cardinality: None,
916 created_at_epoch: None,
917 cleaned_by_watermark: false,
918 stream_job_status: PbStreamJobStatus::Created.into(),
919 create_type: PbCreateType::Foreground.into(),
920 description: Some("description".to_owned()),
921 #[expect(deprecated)]
922 incoming_sinks: vec![],
923 created_at_cluster_version: None,
924 initialized_at_cluster_version: None,
925 version_column_indices: Vec::new(),
926 cdc_table_id: None,
927 maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
928 webhook_info: None,
929 job_id: None,
930 engine: Some(PbEngine::Hummock as i32),
931 clean_watermark_index_in_pk: None,
932
933 refreshable: false,
934 vector_index_info: None,
935 cdc_table_type: None,
936 }
937 .into();
938
939 assert_eq!(
940 table,
941 TableCatalog {
942 id: TableId::new(0),
943 schema_id: 0,
944 database_id: 0,
945 associated_source_id: Some(TableId::new(233)),
946 name: "test".to_owned(),
947 table_type: TableType::Table,
948 columns: vec![
949 ColumnCatalog::row_id_column(),
950 ColumnCatalog {
951 column_desc: ColumnDesc {
952 data_type: StructType::new(vec![
953 ("address", DataType::Varchar),
954 ("zipcode", DataType::Varchar)
955 ],)
956 .into(),
957 column_id: ColumnId::new(1),
958 name: "country".to_owned(),
959 description: None,
960 generated_or_default_column: None,
961 additional_column: AdditionalColumn { column_type: None },
962 version: ColumnDescVersion::LATEST,
963 system_column: None,
964 nullable: true,
965 },
966 is_hidden: false
967 },
968 ColumnCatalog::rw_timestamp_column(),
969 ],
970 stream_key: vec![0],
971 pk: vec![ColumnOrder::new(0, OrderType::ascending())],
972 distribution_key: vec![0],
973 append_only: false,
974 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
975 retention_seconds: Some(300),
976 fragment_id: 0,
977 dml_fragment_id: None,
978 vnode_col_index: None,
979 row_id_index: None,
980 value_indices: vec![0],
981 definition: "".into(),
982 conflict_behavior: ConflictBehavior::NoCheck,
983 read_prefix_len_hint: 0,
984 version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
985 watermark_columns: FixedBitSet::with_capacity(3),
986 dist_key_in_pk: vec![0],
987 cardinality: Cardinality::unknown(),
988 created_at_epoch: None,
989 initialized_at_epoch: None,
990 cleaned_by_watermark: false,
991 stream_job_status: StreamJobStatus::Created,
992 create_type: CreateType::Foreground,
993 description: Some("description".to_owned()),
994 created_at_cluster_version: None,
995 initialized_at_cluster_version: None,
996 version_column_indices: Vec::new(),
997 cdc_table_id: None,
998 vnode_count: VnodeCount::set(233),
999 webhook_info: None,
1000 job_id: None,
1001 engine: Engine::Hummock,
1002 clean_watermark_index_in_pk: None,
1003
1004 refreshable: false,
1005 vector_index_info: None,
1006 cdc_table_type: None,
1007 }
1008 );
1009 assert_eq!(table, TableCatalog::from(table.to_prost()));
1010 }
1011}