1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21 ColumnCatalog, ConflictBehavior, CreateType, Engine, Field, Schema, StreamJobStatus, TableDesc,
22 TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_pb::catalog::table::{
28 OptionalAssociatedSourceId, PbEngine, PbTableType, PbTableVersion,
29};
30use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable, PbWebhookSourceInfo};
31use risingwave_pb::plan_common::DefaultColumnDesc;
32use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
33use risingwave_sqlparser::ast;
34use risingwave_sqlparser::parser::Parser;
35use thiserror_ext::AsReport as _;
36
37use super::purify::try_purify_table_source_create_sql_ast;
38use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId, SinkId};
39use crate::error::{ErrorCode, Result, RwError};
40use crate::expr::ExprImpl;
41use crate::optimizer::property::Cardinality;
42use crate::session::current::notice_to_user;
43use crate::user::UserId;
44
45#[derive(Clone, Debug, PartialEq, Eq, Hash)]
77#[cfg_attr(test, derive(Default))]
78pub struct TableCatalog {
79 pub id: TableId,
80
81 pub schema_id: SchemaId,
82
83 pub database_id: DatabaseId,
84
85 pub associated_source_id: Option<TableId>, pub name: String,
88
89 pub dependent_relations: Vec<TableId>,
90
91 pub columns: Vec<ColumnCatalog>,
93
94 pub pk: Vec<ColumnOrder>,
96
97 pub stream_key: Vec<usize>,
99
100 pub table_type: TableType,
103
104 pub distribution_key: Vec<usize>,
106
107 pub append_only: bool,
110
111 pub cardinality: Cardinality,
113
114 pub owner: UserId,
116
117 pub retention_seconds: Option<u32>,
119
120 pub fragment_id: FragmentId,
122
123 pub dml_fragment_id: Option<FragmentId>,
125
126 pub vnode_col_index: Option<usize>,
129
130 pub row_id_index: Option<usize>,
133
134 pub value_indices: Vec<usize>,
136
137 pub definition: String,
139
140 pub conflict_behavior: ConflictBehavior,
144
145 pub version_column_index: Option<usize>,
146
147 pub read_prefix_len_hint: usize,
148
149 pub version: Option<TableVersion>,
151
152 pub watermark_columns: FixedBitSet,
154
155 pub dist_key_in_pk: Vec<usize>,
158
159 pub created_at_epoch: Option<Epoch>,
160
161 pub initialized_at_epoch: Option<Epoch>,
162
163 pub cleaned_by_watermark: bool,
165
166 pub create_type: CreateType,
168
169 pub stream_job_status: StreamJobStatus,
172
173 pub description: Option<String>,
175
176 pub incoming_sinks: Vec<SinkId>,
178
179 pub created_at_cluster_version: Option<String>,
180
181 pub initialized_at_cluster_version: Option<String>,
182
183 pub cdc_table_id: Option<String>,
184
185 pub vnode_count: VnodeCount,
194
195 pub webhook_info: Option<PbWebhookSourceInfo>,
196
197 pub job_id: Option<TableId>,
198
199 pub engine: Engine,
200
201 pub clean_watermark_index_in_pk: Option<usize>,
202}
203
204pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
205pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
206
207#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
208pub enum TableType {
209 Table,
211 MaterializedView,
213 Index,
216 Internal,
218}
219
220#[cfg(test)]
221impl Default for TableType {
222 fn default() -> Self {
223 Self::Table
224 }
225}
226
227impl TableType {
228 fn from_prost(prost: PbTableType) -> Self {
229 match prost {
230 PbTableType::Table => Self::Table,
231 PbTableType::MaterializedView => Self::MaterializedView,
232 PbTableType::Index => Self::Index,
233 PbTableType::Internal => Self::Internal,
234 PbTableType::Unspecified => unreachable!(),
235 }
236 }
237
238 pub(crate) fn to_prost(self) -> PbTableType {
239 match self {
240 Self::Table => PbTableType::Table,
241 Self::MaterializedView => PbTableType::MaterializedView,
242 Self::Index => PbTableType::Index,
243 Self::Internal => PbTableType::Internal,
244 }
245 }
246}
247
248#[derive(Clone, Debug, PartialEq, Eq, Hash)]
250pub struct TableVersion {
251 pub version_id: TableVersionId,
252 pub next_column_id: ColumnId,
253}
254
255impl TableVersion {
256 #[cfg(test)]
258 pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
259 use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
260
261 Self {
262 version_id: INITIAL_TABLE_VERSION_ID,
263 next_column_id: max_column_id.next(),
264 }
265 }
266
267 pub fn from_prost(prost: PbTableVersion) -> Self {
268 Self {
269 version_id: prost.version,
270 next_column_id: ColumnId::from(prost.next_column_id),
271 }
272 }
273
274 pub fn to_prost(&self) -> PbTableVersion {
275 PbTableVersion {
276 version: self.version_id,
277 next_column_id: self.next_column_id.into(),
278 }
279 }
280}
281
282impl TableCatalog {
283 pub fn create_sql_purified(&self) -> String {
288 self.create_sql_ast_purified()
289 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
290 .unwrap_or_else(|_| self.create_sql())
291 }
292
293 pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
298 if let TableType::Table = self.table_type() {
300 let base = if self.definition.is_empty() {
301 let name = ast::ObjectName(vec![self.name.as_str().into()]);
303 ast::Statement::default_create_table(name)
304 } else {
305 self.create_sql_ast_from_persisted()?
306 };
307
308 match try_purify_table_source_create_sql_ast(
309 base,
310 self.columns(),
311 self.row_id_index,
312 &self.pk_column_ids(),
313 ) {
314 Ok(stmt) => return Ok(stmt),
315 Err(e) => notice_to_user(format!(
316 "error occurred while purifying definition for table \"{}\", \
317 results may be inaccurate: {}",
318 self.name,
319 e.as_report()
320 )),
321 }
322 }
323
324 self.create_sql_ast_from_persisted()
325 }
326}
327
328impl TableCatalog {
329 pub fn id(&self) -> TableId {
331 self.id
332 }
333
334 pub fn with_id(mut self, id: TableId) -> Self {
335 self.id = id;
336 self
337 }
338
339 pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
340 self.cleaned_by_watermark = cleaned_by_watermark;
341 self
342 }
343
344 pub fn conflict_behavior(&self) -> ConflictBehavior {
345 self.conflict_behavior
346 }
347
348 pub fn table_type(&self) -> TableType {
349 self.table_type
350 }
351
352 pub fn engine(&self) -> Engine {
353 self.engine
354 }
355
356 pub fn iceberg_source_name(&self) -> Option<String> {
357 match self.engine {
358 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
359 Engine::Hummock => None,
360 }
361 }
362
363 pub fn iceberg_sink_name(&self) -> Option<String> {
364 match self.engine {
365 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
366 Engine::Hummock => None,
367 }
368 }
369
370 pub fn is_user_table(&self) -> bool {
371 self.table_type == TableType::Table
372 }
373
374 pub fn is_internal_table(&self) -> bool {
375 self.table_type == TableType::Internal
376 }
377
378 pub fn is_mview(&self) -> bool {
379 self.table_type == TableType::MaterializedView
380 }
381
382 pub fn is_index(&self) -> bool {
383 self.table_type == TableType::Index
384 }
385
386 #[must_use]
388 pub fn bad_drop_error(&self) -> RwError {
389 let msg = match self.table_type {
390 TableType::MaterializedView => {
391 "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
392 }
393 TableType::Index => "Use `DROP INDEX` to drop an index.",
394 TableType::Table => "Use `DROP TABLE` to drop a table.",
395 TableType::Internal => "Internal tables cannot be dropped.",
396 };
397
398 ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
399 }
400
401 #[must_use]
403 pub fn associated_source_id(&self) -> Option<TableId> {
404 self.associated_source_id
405 }
406
407 pub fn has_associated_source(&self) -> bool {
408 self.associated_source_id.is_some()
409 }
410
411 pub fn columns(&self) -> &[ColumnCatalog] {
413 &self.columns
414 }
415
416 pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
417 self.columns
418 .iter()
419 .filter(|c| !c.is_rw_timestamp_column())
420 .cloned()
421 .collect()
422 }
423
424 pub fn pk(&self) -> &[ColumnOrder] {
426 self.pk.as_ref()
427 }
428
429 pub fn pk_column_ids(&self) -> Vec<ColumnId> {
431 self.pk
432 .iter()
433 .map(|x| self.columns[x.column_index].column_id())
434 .collect()
435 }
436
437 pub fn pk_column_names(&self) -> Vec<&str> {
439 self.pk
440 .iter()
441 .map(|x| self.columns[x.column_index].name())
442 .collect()
443 }
444
445 pub fn table_desc(&self) -> TableDesc {
450 use risingwave_common::catalog::TableOption;
451
452 let table_options = TableOption::new(self.retention_seconds);
453
454 TableDesc {
455 table_id: self.id,
456 pk: self.pk.clone(),
457 stream_key: self.stream_key.clone(),
458 columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
459 distribution_key: self.distribution_key.clone(),
460 append_only: self.append_only,
461 retention_seconds: table_options.retention_seconds,
462 value_indices: self.value_indices.clone(),
463 read_prefix_len_hint: self.read_prefix_len_hint,
464 watermark_columns: self.watermark_columns.clone(),
465 versioned: self.version.is_some(),
466 vnode_col_index: self.vnode_col_index,
467 vnode_count: self.vnode_count(),
468 }
469 }
470
471 pub fn name(&self) -> &str {
473 self.name.as_ref()
474 }
475
476 pub fn distribution_key(&self) -> &[usize] {
477 self.distribution_key.as_ref()
478 }
479
480 pub fn to_internal_table_prost(&self) -> PbTable {
481 self.to_prost()
482 }
483
484 pub fn create_sql(&self) -> String {
488 self.create_sql_ast()
489 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
490 .unwrap_or_else(|_| self.definition.clone())
491 }
492
493 pub fn create_sql_ast(&self) -> Result<ast::Statement> {
501 if let TableType::Table = self.table_type()
502 && self.definition.is_empty()
503 {
504 self.create_sql_ast_purified()
506 } else {
507 self.create_sql_ast_from_persisted()
509 }
510 }
511
512 fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
513 Ok(Parser::parse_exactly_one(&self.definition)?)
514 }
515
516 pub fn version(&self) -> Option<&TableVersion> {
518 self.version.as_ref()
519 }
520
521 pub fn version_id(&self) -> Option<TableVersionId> {
523 self.version().map(|v| v.version_id)
524 }
525
526 pub fn vnode_count(&self) -> usize {
530 self.vnode_count.value()
531 }
532
533 pub fn to_prost(&self) -> PbTable {
534 PbTable {
535 id: self.id.table_id,
536 schema_id: self.schema_id,
537 database_id: self.database_id,
538 name: self.name.clone(),
539 columns: self
541 .columns_without_rw_timestamp()
542 .iter()
543 .map(|c| c.to_protobuf())
544 .collect(),
545 pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
546 stream_key: self.stream_key.iter().map(|x| *x as _).collect(),
547 dependent_relations: (self.dependent_relations.iter())
548 .map(|t| t.table_id)
549 .collect(),
550 optional_associated_source_id: self
551 .associated_source_id
552 .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
553 table_type: self.table_type.to_prost() as i32,
554 distribution_key: self
555 .distribution_key
556 .iter()
557 .map(|k| *k as i32)
558 .collect_vec(),
559 append_only: self.append_only,
560 owner: self.owner,
561 fragment_id: self.fragment_id,
562 dml_fragment_id: self.dml_fragment_id,
563 vnode_col_index: self.vnode_col_index.map(|i| i as _),
564 row_id_index: self.row_id_index.map(|i| i as _),
565 value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
566 definition: self.definition.clone(),
567 read_prefix_len_hint: self.read_prefix_len_hint as u32,
568 version: self.version.as_ref().map(TableVersion::to_prost),
569 watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
570 dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
571 handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
572 version_column_index: self.version_column_index.map(|value| value as u32),
573 cardinality: Some(self.cardinality.to_protobuf()),
574 initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
575 created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
576 cleaned_by_watermark: self.cleaned_by_watermark,
577 stream_job_status: self.stream_job_status.to_proto().into(),
578 create_type: self.create_type.to_proto().into(),
579 description: self.description.clone(),
580 incoming_sinks: self.incoming_sinks.clone(),
581 created_at_cluster_version: self.created_at_cluster_version.clone(),
582 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
583 retention_seconds: self.retention_seconds,
584 cdc_table_id: self.cdc_table_id.clone(),
585 maybe_vnode_count: self.vnode_count.to_protobuf(),
586 webhook_info: self.webhook_info.clone(),
587 job_id: self.job_id.map(|id| id.table_id),
588 engine: Some(self.engine.to_protobuf().into()),
589 clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
590 }
591 }
592
593 pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
595 self.columns
596 .iter()
597 .filter(|c| !c.is_hidden() && !c.is_generated())
598 }
599
600 pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
601 self.columns
602 .iter()
603 .filter(|c| c.is_generated())
604 .map(|c| c.name())
605 }
606
607 pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
608 self.columns
609 .iter()
610 .enumerate()
611 .filter(|(_, c)| c.is_generated())
612 .map(|(i, _)| i)
613 }
614
615 pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
616 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
617 .columns[col_idx]
618 .column_desc
619 .generated_or_default_column
620 .as_ref()
621 {
622 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
623 .expect("expr in default columns corrupted")
624 } else {
625 ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
626 }
627 }
628
629 pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
630 columns
631 .iter()
632 .map(|c| {
633 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
634 expr,
635 ..
636 })) = c.column_desc.generated_or_default_column.as_ref()
637 {
638 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
639 .expect("expr in default columns corrupted")
640 } else {
641 ExprImpl::literal_null(c.data_type().clone())
642 }
643 })
644 .collect()
645 }
646
647 pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
648 self.columns.iter().enumerate().filter_map(|(i, c)| {
649 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
650 expr, ..
651 })) = c.column_desc.generated_or_default_column.as_ref()
652 {
653 Some((
654 i,
655 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
656 .expect("expr in default columns corrupted"),
657 ))
658 } else {
659 None
660 }
661 })
662 }
663
664 pub fn has_generated_column(&self) -> bool {
665 self.columns.iter().any(|c| c.is_generated())
666 }
667
668 pub fn has_rw_timestamp_column(&self) -> bool {
669 self.columns.iter().any(|c| c.is_rw_timestamp_column())
670 }
671
672 pub fn column_schema(&self) -> Schema {
673 Schema::new(
674 self.columns
675 .iter()
676 .map(|c| Field::from(&c.column_desc))
677 .collect(),
678 )
679 }
680
681 pub fn is_created(&self) -> bool {
682 self.stream_job_status == StreamJobStatus::Created
683 }
684
685 pub fn is_iceberg_engine_table(&self) -> bool {
686 self.engine == Engine::Iceberg
687 }
688}
689
690impl From<PbTable> for TableCatalog {
691 fn from(tb: PbTable) -> Self {
692 let id = tb.id;
693 let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
694 let tb_engine = tb
695 .get_engine()
696 .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
697 .unwrap_or(PbEngine::Hummock);
698 let table_type = tb.get_table_type().unwrap();
699 let stream_job_status = tb
700 .get_stream_job_status()
701 .unwrap_or(PbStreamJobStatus::Created);
702 let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
703 let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
704 OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
705 });
706 let name = tb.name.clone();
707
708 let vnode_count = tb.vnode_count_inner();
709 if let VnodeCount::Placeholder = vnode_count {
710 assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
713 }
714
715 let mut col_names = HashSet::new();
716 let mut col_index: HashMap<i32, usize> = HashMap::new();
717
718 let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
719 let version_column_index = tb.version_column_index.map(|value| value as usize);
720 let mut columns: Vec<ColumnCatalog> =
721 tb.columns.into_iter().map(ColumnCatalog::from).collect();
722 if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
723 columns.push(ColumnCatalog::rw_timestamp_column());
725 }
726 for (idx, catalog) in columns.clone().into_iter().enumerate() {
727 let col_name = catalog.name();
728 if !col_names.insert(col_name.to_owned()) {
729 panic!("duplicated column name {} in table {} ", col_name, tb.name)
730 }
731
732 let col_id = catalog.column_desc.column_id.get_id();
733 col_index.insert(col_id, idx);
734 }
735
736 let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
737 let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
738 for idx in &tb.watermark_indices {
739 watermark_columns.insert(*idx as _);
740 }
741 let engine = Engine::from_protobuf(&tb_engine);
742
743 Self {
744 id: id.into(),
745 schema_id: tb.schema_id,
746 database_id: tb.database_id,
747 associated_source_id: associated_source_id.map(Into::into),
748 name,
749 pk,
750 columns,
751 table_type: TableType::from_prost(table_type),
752 distribution_key: tb
753 .distribution_key
754 .iter()
755 .map(|k| *k as usize)
756 .collect_vec(),
757 stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
758 append_only: tb.append_only,
759 owner: tb.owner,
760 fragment_id: tb.fragment_id,
761 dml_fragment_id: tb.dml_fragment_id,
762 vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
763 row_id_index: tb.row_id_index.map(|x| x as usize),
764 value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
765 definition: tb.definition,
766 conflict_behavior,
767 version_column_index,
768 read_prefix_len_hint: tb.read_prefix_len_hint as usize,
769 version: tb.version.map(TableVersion::from_prost),
770 watermark_columns,
771 dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
772 cardinality: tb
773 .cardinality
774 .map(|c| Cardinality::from_protobuf(&c))
775 .unwrap_or_else(Cardinality::unknown),
776 created_at_epoch: tb.created_at_epoch.map(Epoch::from),
777 initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
778 cleaned_by_watermark: tb.cleaned_by_watermark,
779 create_type: CreateType::from_proto(create_type),
780 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
781 description: tb.description,
782 incoming_sinks: tb.incoming_sinks.clone(),
783 created_at_cluster_version: tb.created_at_cluster_version.clone(),
784 initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
785 retention_seconds: tb.retention_seconds,
786 dependent_relations: tb
787 .dependent_relations
788 .into_iter()
789 .map(TableId::from)
790 .collect_vec(),
791 cdc_table_id: tb.cdc_table_id,
792 vnode_count,
793 webhook_info: tb.webhook_info,
794 job_id: tb.job_id.map(TableId::from),
795 engine,
796 clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
797 }
798 }
799}
800
801impl From<&PbTable> for TableCatalog {
802 fn from(tb: &PbTable) -> Self {
803 tb.clone().into()
804 }
805}
806
807impl OwnedByUserCatalog for TableCatalog {
808 fn owner(&self) -> UserId {
809 self.owner
810 }
811}
812
813#[cfg(test)]
814mod tests {
815 use risingwave_common::catalog::{ColumnDesc, ColumnId};
816 use risingwave_common::test_prelude::*;
817 use risingwave_common::types::*;
818 use risingwave_common::util::sort_util::OrderType;
819 use risingwave_pb::catalog::table::PbEngine;
820 use risingwave_pb::plan_common::{
821 AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
822 };
823
824 use super::*;
825
826 #[test]
827 fn test_into_table_catalog() {
828 let table: TableCatalog = PbTable {
829 id: 0,
830 schema_id: 0,
831 database_id: 0,
832 name: "test".to_owned(),
833 table_type: PbTableType::Table as i32,
834 columns: vec![
835 ColumnCatalog::row_id_column().to_protobuf(),
836 PbColumnCatalog {
837 column_desc: Some(PbColumnDesc::new(
838 DataType::from(StructType::new([
839 ("address", DataType::Varchar),
840 ("zipcode", DataType::Varchar),
841 ]))
842 .to_protobuf(),
843 "country",
844 1,
845 )),
846 is_hidden: false,
847 },
848 ],
849 pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
850 stream_key: vec![0],
851 dependent_relations: vec![114514],
852 distribution_key: vec![0],
853 optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233)
854 .into(),
855 append_only: false,
856 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
857 retention_seconds: Some(300),
858 fragment_id: 0,
859 dml_fragment_id: None,
860 initialized_at_epoch: None,
861 value_indices: vec![0],
862 definition: "".into(),
863 read_prefix_len_hint: 0,
864 vnode_col_index: None,
865 row_id_index: None,
866 version: Some(PbTableVersion {
867 version: 0,
868 next_column_id: 2,
869 }),
870 watermark_indices: vec![],
871 handle_pk_conflict_behavior: 3,
872 dist_key_in_pk: vec![0],
873 cardinality: None,
874 created_at_epoch: None,
875 cleaned_by_watermark: false,
876 stream_job_status: PbStreamJobStatus::Created.into(),
877 create_type: PbCreateType::Foreground.into(),
878 description: Some("description".to_owned()),
879 incoming_sinks: vec![],
880 created_at_cluster_version: None,
881 initialized_at_cluster_version: None,
882 version_column_index: None,
883 cdc_table_id: None,
884 maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
885 webhook_info: None,
886 job_id: None,
887 engine: Some(PbEngine::Hummock as i32),
888 clean_watermark_index_in_pk: None,
889 }
890 .into();
891
892 assert_eq!(
893 table,
894 TableCatalog {
895 id: TableId::new(0),
896 schema_id: 0,
897 database_id: 0,
898 associated_source_id: Some(TableId::new(233)),
899 name: "test".to_owned(),
900 table_type: TableType::Table,
901 columns: vec![
902 ColumnCatalog::row_id_column(),
903 ColumnCatalog {
904 column_desc: ColumnDesc {
905 data_type: StructType::new(vec![
906 ("address", DataType::Varchar),
907 ("zipcode", DataType::Varchar)
908 ],)
909 .into(),
910 column_id: ColumnId::new(1),
911 name: "country".to_owned(),
912 description: None,
913 generated_or_default_column: None,
914 additional_column: AdditionalColumn { column_type: None },
915 version: ColumnDescVersion::LATEST,
916 system_column: None,
917 nullable: true,
918 },
919 is_hidden: false
920 },
921 ColumnCatalog::rw_timestamp_column(),
922 ],
923 stream_key: vec![0],
924 pk: vec![ColumnOrder::new(0, OrderType::ascending())],
925 distribution_key: vec![0],
926 append_only: false,
927 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
928 retention_seconds: Some(300),
929 fragment_id: 0,
930 dml_fragment_id: None,
931 vnode_col_index: None,
932 row_id_index: None,
933 value_indices: vec![0],
934 definition: "".into(),
935 conflict_behavior: ConflictBehavior::NoCheck,
936 read_prefix_len_hint: 0,
937 version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
938 watermark_columns: FixedBitSet::with_capacity(3),
939 dist_key_in_pk: vec![0],
940 cardinality: Cardinality::unknown(),
941 created_at_epoch: None,
942 initialized_at_epoch: None,
943 cleaned_by_watermark: false,
944 stream_job_status: StreamJobStatus::Created,
945 create_type: CreateType::Foreground,
946 description: Some("description".to_owned()),
947 incoming_sinks: vec![],
948 created_at_cluster_version: None,
949 initialized_at_cluster_version: None,
950 dependent_relations: vec![114514.into()],
951 version_column_index: None,
952 cdc_table_id: None,
953 vnode_count: VnodeCount::set(233),
954 webhook_info: None,
955 job_id: None,
956 engine: Engine::Hummock,
957 clean_watermark_index_in_pk: None,
958 }
959 );
960 assert_eq!(table, TableCatalog::from(table.to_prost()));
961 }
962}