1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use anyhow::Context as _;
19use fixedbitset::FixedBitSet;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22 ColumnCatalog, ConflictBehavior, CreateType, Engine, Field, Schema, StreamJobStatus, TableDesc,
23 TableId, TableVersionId,
24};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::sort_util::ColumnOrder;
28use risingwave_pb::catalog::table::{
29 OptionalAssociatedSourceId, PbEngine, PbTableType, PbTableVersion,
30};
31use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable, PbWebhookSourceInfo};
32use risingwave_pb::plan_common::DefaultColumnDesc;
33use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
34use risingwave_sqlparser::ast;
35use risingwave_sqlparser::parser::Parser;
36use thiserror_ext::AsReport as _;
37
38use super::purify::try_purify_table_source_create_sql_ast;
39use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId, SinkId};
40use crate::error::{ErrorCode, Result, RwError};
41use crate::expr::ExprImpl;
42use crate::optimizer::property::Cardinality;
43use crate::session::current::notice_to_user;
44use crate::user::UserId;
45
46#[derive(Clone, Debug, PartialEq, Eq, Hash)]
78#[cfg_attr(test, derive(Default))]
79pub struct TableCatalog {
80 pub id: TableId,
81
82 pub schema_id: SchemaId,
83
84 pub database_id: DatabaseId,
85
86 pub associated_source_id: Option<TableId>, pub name: String,
89
90 pub dependent_relations: Vec<TableId>,
91
92 pub columns: Vec<ColumnCatalog>,
94
95 pub pk: Vec<ColumnOrder>,
97
98 pub stream_key: Vec<usize>,
100
101 pub table_type: TableType,
104
105 pub distribution_key: Vec<usize>,
107
108 pub append_only: bool,
111
112 pub cardinality: Cardinality,
114
115 pub owner: UserId,
117
118 pub retention_seconds: Option<u32>,
120
121 pub fragment_id: FragmentId,
123
124 pub dml_fragment_id: Option<FragmentId>,
126
127 pub vnode_col_index: Option<usize>,
130
131 pub row_id_index: Option<usize>,
134
135 pub value_indices: Vec<usize>,
137
138 pub definition: String,
140
141 pub conflict_behavior: ConflictBehavior,
145
146 pub version_column_index: Option<usize>,
147
148 pub read_prefix_len_hint: usize,
149
150 pub version: Option<TableVersion>,
152
153 pub watermark_columns: FixedBitSet,
155
156 pub dist_key_in_pk: Vec<usize>,
159
160 pub created_at_epoch: Option<Epoch>,
161
162 pub initialized_at_epoch: Option<Epoch>,
163
164 pub cleaned_by_watermark: bool,
166
167 pub create_type: CreateType,
169
170 pub stream_job_status: StreamJobStatus,
173
174 pub description: Option<String>,
176
177 pub incoming_sinks: Vec<SinkId>,
179
180 pub created_at_cluster_version: Option<String>,
181
182 pub initialized_at_cluster_version: Option<String>,
183
184 pub cdc_table_id: Option<String>,
185
186 pub vnode_count: VnodeCount,
195
196 pub webhook_info: Option<PbWebhookSourceInfo>,
197
198 pub job_id: Option<TableId>,
199
200 pub engine: Engine,
201
202 pub clean_watermark_index_in_pk: Option<usize>,
203}
204
205pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
206pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
207
208#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
209pub enum TableType {
210 Table,
212 MaterializedView,
214 Index,
217 Internal,
219}
220
221#[cfg(test)]
222impl Default for TableType {
223 fn default() -> Self {
224 Self::Table
225 }
226}
227
228impl TableType {
229 fn from_prost(prost: PbTableType) -> Self {
230 match prost {
231 PbTableType::Table => Self::Table,
232 PbTableType::MaterializedView => Self::MaterializedView,
233 PbTableType::Index => Self::Index,
234 PbTableType::Internal => Self::Internal,
235 PbTableType::Unspecified => unreachable!(),
236 }
237 }
238
239 pub(crate) fn to_prost(self) -> PbTableType {
240 match self {
241 Self::Table => PbTableType::Table,
242 Self::MaterializedView => PbTableType::MaterializedView,
243 Self::Index => PbTableType::Index,
244 Self::Internal => PbTableType::Internal,
245 }
246 }
247}
248
249#[derive(Clone, Debug, PartialEq, Eq, Hash)]
251pub struct TableVersion {
252 pub version_id: TableVersionId,
253 pub next_column_id: ColumnId,
254}
255
256impl TableVersion {
257 #[cfg(test)]
259 pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
260 use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
261
262 Self {
263 version_id: INITIAL_TABLE_VERSION_ID,
264 next_column_id: max_column_id.next(),
265 }
266 }
267
268 pub fn from_prost(prost: PbTableVersion) -> Self {
269 Self {
270 version_id: prost.version,
271 next_column_id: ColumnId::from(prost.next_column_id),
272 }
273 }
274
275 pub fn to_prost(&self) -> PbTableVersion {
276 PbTableVersion {
277 version: self.version_id,
278 next_column_id: self.next_column_id.into(),
279 }
280 }
281}
282
283impl TableCatalog {
284 pub fn create_sql_purified(&self) -> String {
289 self.create_sql_ast_purified()
290 .map(|stmt| stmt.to_string())
291 .unwrap_or_else(|_| self.create_sql())
292 }
293
294 pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
299 if let TableType::Table = self.table_type() {
301 let base = if self.definition.is_empty() {
302 let name = ast::ObjectName(vec![self.name.as_str().into()]);
304 ast::Statement::default_create_table(name)
305 } else {
306 self.create_sql_ast_from_persisted()?
307 };
308
309 match try_purify_table_source_create_sql_ast(
310 base,
311 self.columns(),
312 self.row_id_index,
313 &self.pk_column_ids(),
314 ) {
315 Ok(stmt) => return Ok(stmt),
316 Err(e) => notice_to_user(format!(
317 "error occurred while purifying definition for table \"{}\", \
318 results may be inaccurate: {}",
319 self.name,
320 e.as_report()
321 )),
322 }
323 }
324
325 self.create_sql_ast_from_persisted()
326 }
327}
328
329impl TableCatalog {
330 pub fn id(&self) -> TableId {
332 self.id
333 }
334
335 pub fn with_id(mut self, id: TableId) -> Self {
336 self.id = id;
337 self
338 }
339
340 pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
341 self.cleaned_by_watermark = cleaned_by_watermark;
342 self
343 }
344
345 pub fn conflict_behavior(&self) -> ConflictBehavior {
346 self.conflict_behavior
347 }
348
349 pub fn table_type(&self) -> TableType {
350 self.table_type
351 }
352
353 pub fn engine(&self) -> Engine {
354 self.engine
355 }
356
357 pub fn iceberg_source_name(&self) -> Option<String> {
358 match self.engine {
359 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
360 Engine::Hummock => None,
361 }
362 }
363
364 pub fn iceberg_sink_name(&self) -> Option<String> {
365 match self.engine {
366 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
367 Engine::Hummock => None,
368 }
369 }
370
371 pub fn is_user_table(&self) -> bool {
372 self.table_type == TableType::Table
373 }
374
375 pub fn is_internal_table(&self) -> bool {
376 self.table_type == TableType::Internal
377 }
378
379 pub fn is_mview(&self) -> bool {
380 self.table_type == TableType::MaterializedView
381 }
382
383 pub fn is_index(&self) -> bool {
384 self.table_type == TableType::Index
385 }
386
387 #[must_use]
389 pub fn bad_drop_error(&self) -> RwError {
390 let msg = match self.table_type {
391 TableType::MaterializedView => {
392 "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
393 }
394 TableType::Index => "Use `DROP INDEX` to drop an index.",
395 TableType::Table => "Use `DROP TABLE` to drop a table.",
396 TableType::Internal => "Internal tables cannot be dropped.",
397 };
398
399 ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
400 }
401
402 #[must_use]
404 pub fn associated_source_id(&self) -> Option<TableId> {
405 self.associated_source_id
406 }
407
408 pub fn has_associated_source(&self) -> bool {
409 self.associated_source_id.is_some()
410 }
411
412 pub fn columns(&self) -> &[ColumnCatalog] {
414 &self.columns
415 }
416
417 pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
418 self.columns
419 .iter()
420 .filter(|c| !c.is_rw_timestamp_column())
421 .cloned()
422 .collect()
423 }
424
425 pub fn pk(&self) -> &[ColumnOrder] {
427 self.pk.as_ref()
428 }
429
430 pub fn pk_column_ids(&self) -> Vec<ColumnId> {
432 self.pk
433 .iter()
434 .map(|x| self.columns[x.column_index].column_id())
435 .collect()
436 }
437
438 pub fn pk_column_names(&self) -> Vec<&str> {
440 self.pk
441 .iter()
442 .map(|x| self.columns[x.column_index].name())
443 .collect()
444 }
445
446 pub fn table_desc(&self) -> TableDesc {
451 use risingwave_common::catalog::TableOption;
452
453 let table_options = TableOption::new(self.retention_seconds);
454
455 TableDesc {
456 table_id: self.id,
457 pk: self.pk.clone(),
458 stream_key: self.stream_key.clone(),
459 columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
460 distribution_key: self.distribution_key.clone(),
461 append_only: self.append_only,
462 retention_seconds: table_options.retention_seconds,
463 value_indices: self.value_indices.clone(),
464 read_prefix_len_hint: self.read_prefix_len_hint,
465 watermark_columns: self.watermark_columns.clone(),
466 versioned: self.version.is_some(),
467 vnode_col_index: self.vnode_col_index,
468 vnode_count: self.vnode_count(),
469 }
470 }
471
472 pub fn name(&self) -> &str {
474 self.name.as_ref()
475 }
476
477 pub fn distribution_key(&self) -> &[usize] {
478 self.distribution_key.as_ref()
479 }
480
481 pub fn to_internal_table_prost(&self) -> PbTable {
482 self.to_prost()
483 }
484
485 pub fn create_sql(&self) -> String {
489 self.create_sql_ast()
490 .map(|stmt| stmt.to_string())
491 .unwrap_or_else(|_| self.definition.clone())
492 }
493
494 pub fn create_sql_ast(&self) -> Result<ast::Statement> {
502 if let TableType::Table = self.table_type()
503 && self.definition.is_empty()
504 {
505 self.create_sql_ast_purified()
507 } else {
508 self.create_sql_ast_from_persisted()
510 }
511 }
512
513 fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
514 Ok(Parser::parse_sql(&self.definition)
515 .context("unable to parse definition sql")?
516 .into_iter()
517 .exactly_one()
518 .context("expecting exactly one statement in definition")?)
519 }
520
521 pub fn version(&self) -> Option<&TableVersion> {
523 self.version.as_ref()
524 }
525
526 pub fn version_id(&self) -> Option<TableVersionId> {
528 self.version().map(|v| v.version_id)
529 }
530
531 pub fn vnode_count(&self) -> usize {
535 self.vnode_count.value()
536 }
537
538 pub fn to_prost(&self) -> PbTable {
539 PbTable {
540 id: self.id.table_id,
541 schema_id: self.schema_id,
542 database_id: self.database_id,
543 name: self.name.clone(),
544 columns: self
546 .columns_without_rw_timestamp()
547 .iter()
548 .map(|c| c.to_protobuf())
549 .collect(),
550 pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
551 stream_key: self.stream_key.iter().map(|x| *x as _).collect(),
552 dependent_relations: vec![],
553 optional_associated_source_id: self
554 .associated_source_id
555 .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
556 table_type: self.table_type.to_prost() as i32,
557 distribution_key: self
558 .distribution_key
559 .iter()
560 .map(|k| *k as i32)
561 .collect_vec(),
562 append_only: self.append_only,
563 owner: self.owner,
564 fragment_id: self.fragment_id,
565 dml_fragment_id: self.dml_fragment_id,
566 vnode_col_index: self.vnode_col_index.map(|i| i as _),
567 row_id_index: self.row_id_index.map(|i| i as _),
568 value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
569 definition: self.definition.clone(),
570 read_prefix_len_hint: self.read_prefix_len_hint as u32,
571 version: self.version.as_ref().map(TableVersion::to_prost),
572 watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
573 dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
574 handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
575 version_column_index: self.version_column_index.map(|value| value as u32),
576 cardinality: Some(self.cardinality.to_protobuf()),
577 initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
578 created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
579 cleaned_by_watermark: self.cleaned_by_watermark,
580 stream_job_status: self.stream_job_status.to_proto().into(),
581 create_type: self.create_type.to_proto().into(),
582 description: self.description.clone(),
583 incoming_sinks: self.incoming_sinks.clone(),
584 created_at_cluster_version: self.created_at_cluster_version.clone(),
585 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
586 retention_seconds: self.retention_seconds,
587 cdc_table_id: self.cdc_table_id.clone(),
588 maybe_vnode_count: self.vnode_count.to_protobuf(),
589 webhook_info: self.webhook_info.clone(),
590 job_id: self.job_id.map(|id| id.table_id),
591 engine: Some(self.engine.to_protobuf().into()),
592 clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
593 }
594 }
595
596 pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
598 self.columns
599 .iter()
600 .filter(|c| !c.is_hidden() && !c.is_generated())
601 }
602
603 pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
604 self.columns
605 .iter()
606 .filter(|c| c.is_generated())
607 .map(|c| c.name())
608 }
609
610 pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
611 self.columns
612 .iter()
613 .enumerate()
614 .filter(|(_, c)| c.is_generated())
615 .map(|(i, _)| i)
616 }
617
618 pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
619 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
620 .columns[col_idx]
621 .column_desc
622 .generated_or_default_column
623 .as_ref()
624 {
625 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
626 .expect("expr in default columns corrupted")
627 } else {
628 ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
629 }
630 }
631
632 pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
633 columns
634 .iter()
635 .map(|c| {
636 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
637 expr,
638 ..
639 })) = c.column_desc.generated_or_default_column.as_ref()
640 {
641 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
642 .expect("expr in default columns corrupted")
643 } else {
644 ExprImpl::literal_null(c.data_type().clone())
645 }
646 })
647 .collect()
648 }
649
650 pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
651 self.columns.iter().enumerate().filter_map(|(i, c)| {
652 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
653 expr, ..
654 })) = c.column_desc.generated_or_default_column.as_ref()
655 {
656 Some((
657 i,
658 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
659 .expect("expr in default columns corrupted"),
660 ))
661 } else {
662 None
663 }
664 })
665 }
666
667 pub fn has_generated_column(&self) -> bool {
668 self.columns.iter().any(|c| c.is_generated())
669 }
670
671 pub fn has_rw_timestamp_column(&self) -> bool {
672 self.columns.iter().any(|c| c.is_rw_timestamp_column())
673 }
674
675 pub fn column_schema(&self) -> Schema {
676 Schema::new(
677 self.columns
678 .iter()
679 .map(|c| Field::from(&c.column_desc))
680 .collect(),
681 )
682 }
683
684 pub fn is_created(&self) -> bool {
685 self.stream_job_status == StreamJobStatus::Created
686 }
687
688 pub fn is_iceberg_engine_table(&self) -> bool {
689 self.engine == Engine::Iceberg
690 }
691}
692
693impl From<PbTable> for TableCatalog {
694 fn from(tb: PbTable) -> Self {
695 let id = tb.id;
696 let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
697 let tb_engine = tb
698 .get_engine()
699 .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
700 .unwrap_or(PbEngine::Hummock);
701 let table_type = tb.get_table_type().unwrap();
702 let stream_job_status = tb
703 .get_stream_job_status()
704 .unwrap_or(PbStreamJobStatus::Created);
705 let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
706 let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
707 OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
708 });
709 let name = tb.name.clone();
710
711 let vnode_count = tb.vnode_count_inner();
712 if let VnodeCount::Placeholder = vnode_count {
713 assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
716 }
717
718 let mut col_names = HashSet::new();
719 let mut col_index: HashMap<i32, usize> = HashMap::new();
720
721 let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
722 let version_column_index = tb.version_column_index.map(|value| value as usize);
723 let mut columns: Vec<ColumnCatalog> =
724 tb.columns.into_iter().map(ColumnCatalog::from).collect();
725 if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
726 columns.push(ColumnCatalog::rw_timestamp_column());
728 }
729 for (idx, catalog) in columns.clone().into_iter().enumerate() {
730 let col_name = catalog.name();
731 if !col_names.insert(col_name.to_owned()) {
732 panic!("duplicated column name {} in table {} ", col_name, tb.name)
733 }
734
735 let col_id = catalog.column_desc.column_id.get_id();
736 col_index.insert(col_id, idx);
737 }
738
739 let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
740 let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
741 for idx in &tb.watermark_indices {
742 watermark_columns.insert(*idx as _);
743 }
744 let engine = Engine::from_protobuf(&tb_engine);
745
746 Self {
747 id: id.into(),
748 schema_id: tb.schema_id,
749 database_id: tb.database_id,
750 associated_source_id: associated_source_id.map(Into::into),
751 name,
752 pk,
753 columns,
754 table_type: TableType::from_prost(table_type),
755 distribution_key: tb
756 .distribution_key
757 .iter()
758 .map(|k| *k as usize)
759 .collect_vec(),
760 stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
761 append_only: tb.append_only,
762 owner: tb.owner,
763 fragment_id: tb.fragment_id,
764 dml_fragment_id: tb.dml_fragment_id,
765 vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
766 row_id_index: tb.row_id_index.map(|x| x as usize),
767 value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
768 definition: tb.definition,
769 conflict_behavior,
770 version_column_index,
771 read_prefix_len_hint: tb.read_prefix_len_hint as usize,
772 version: tb.version.map(TableVersion::from_prost),
773 watermark_columns,
774 dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
775 cardinality: tb
776 .cardinality
777 .map(|c| Cardinality::from_protobuf(&c))
778 .unwrap_or_else(Cardinality::unknown),
779 created_at_epoch: tb.created_at_epoch.map(Epoch::from),
780 initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
781 cleaned_by_watermark: tb.cleaned_by_watermark,
782 create_type: CreateType::from_proto(create_type),
783 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
784 description: tb.description,
785 incoming_sinks: tb.incoming_sinks.clone(),
786 created_at_cluster_version: tb.created_at_cluster_version.clone(),
787 initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
788 retention_seconds: tb.retention_seconds,
789 dependent_relations: tb
790 .dependent_relations
791 .into_iter()
792 .map(TableId::from)
793 .collect_vec(),
794 cdc_table_id: tb.cdc_table_id,
795 vnode_count,
796 webhook_info: tb.webhook_info,
797 job_id: tb.job_id.map(TableId::from),
798 engine,
799 clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
800 }
801 }
802}
803
804impl From<&PbTable> for TableCatalog {
805 fn from(tb: &PbTable) -> Self {
806 tb.clone().into()
807 }
808}
809
810impl OwnedByUserCatalog for TableCatalog {
811 fn owner(&self) -> UserId {
812 self.owner
813 }
814}
815
816#[cfg(test)]
817mod tests {
818 use risingwave_common::catalog::{ColumnDesc, ColumnId};
819 use risingwave_common::test_prelude::*;
820 use risingwave_common::types::*;
821 use risingwave_common::util::sort_util::OrderType;
822 use risingwave_pb::catalog::table::PbEngine;
823 use risingwave_pb::plan_common::{
824 AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
825 };
826
827 use super::*;
828
829 #[test]
830 fn test_into_table_catalog() {
831 let table: TableCatalog = PbTable {
832 id: 0,
833 schema_id: 0,
834 database_id: 0,
835 name: "test".to_owned(),
836 table_type: PbTableType::Table as i32,
837 columns: vec![
838 ColumnCatalog::row_id_column().to_protobuf(),
839 PbColumnCatalog {
840 column_desc: Some(PbColumnDesc::new(
841 DataType::from(StructType::new([
842 ("address", DataType::Varchar),
843 ("zipcode", DataType::Varchar),
844 ]))
845 .to_protobuf(),
846 "country",
847 1,
848 )),
849 is_hidden: false,
850 },
851 ],
852 pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
853 stream_key: vec![0],
854 dependent_relations: vec![],
855 distribution_key: vec![0],
856 optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233)
857 .into(),
858 append_only: false,
859 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
860 retention_seconds: Some(300),
861 fragment_id: 0,
862 dml_fragment_id: None,
863 initialized_at_epoch: None,
864 value_indices: vec![0],
865 definition: "".into(),
866 read_prefix_len_hint: 0,
867 vnode_col_index: None,
868 row_id_index: None,
869 version: Some(PbTableVersion {
870 version: 0,
871 next_column_id: 2,
872 }),
873 watermark_indices: vec![],
874 handle_pk_conflict_behavior: 3,
875 dist_key_in_pk: vec![0],
876 cardinality: None,
877 created_at_epoch: None,
878 cleaned_by_watermark: false,
879 stream_job_status: PbStreamJobStatus::Created.into(),
880 create_type: PbCreateType::Foreground.into(),
881 description: Some("description".to_owned()),
882 incoming_sinks: vec![],
883 created_at_cluster_version: None,
884 initialized_at_cluster_version: None,
885 version_column_index: None,
886 cdc_table_id: None,
887 maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
888 webhook_info: None,
889 job_id: None,
890 engine: Some(PbEngine::Hummock as i32),
891 clean_watermark_index_in_pk: None,
892 }
893 .into();
894
895 assert_eq!(
896 table,
897 TableCatalog {
898 id: TableId::new(0),
899 schema_id: 0,
900 database_id: 0,
901 associated_source_id: Some(TableId::new(233)),
902 name: "test".to_owned(),
903 table_type: TableType::Table,
904 columns: vec![
905 ColumnCatalog::row_id_column(),
906 ColumnCatalog {
907 column_desc: ColumnDesc {
908 data_type: StructType::new(vec![
909 ("address", DataType::Varchar),
910 ("zipcode", DataType::Varchar)
911 ],)
912 .into(),
913 column_id: ColumnId::new(1),
914 name: "country".to_owned(),
915 description: None,
916 generated_or_default_column: None,
917 additional_column: AdditionalColumn { column_type: None },
918 version: ColumnDescVersion::LATEST,
919 system_column: None,
920 nullable: true,
921 },
922 is_hidden: false
923 },
924 ColumnCatalog::rw_timestamp_column(),
925 ],
926 stream_key: vec![0],
927 pk: vec![ColumnOrder::new(0, OrderType::ascending())],
928 distribution_key: vec![0],
929 append_only: false,
930 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
931 retention_seconds: Some(300),
932 fragment_id: 0,
933 dml_fragment_id: None,
934 vnode_col_index: None,
935 row_id_index: None,
936 value_indices: vec![0],
937 definition: "".into(),
938 conflict_behavior: ConflictBehavior::NoCheck,
939 read_prefix_len_hint: 0,
940 version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
941 watermark_columns: FixedBitSet::with_capacity(3),
942 dist_key_in_pk: vec![0],
943 cardinality: Cardinality::unknown(),
944 created_at_epoch: None,
945 initialized_at_epoch: None,
946 cleaned_by_watermark: false,
947 stream_job_status: StreamJobStatus::Created,
948 create_type: CreateType::Foreground,
949 description: Some("description".to_owned()),
950 incoming_sinks: vec![],
951 created_at_cluster_version: None,
952 initialized_at_cluster_version: None,
953 dependent_relations: vec![],
954 version_column_index: None,
955 cdc_table_id: None,
956 vnode_count: VnodeCount::set(233),
957 webhook_info: None,
958 job_id: None,
959 engine: Engine::Hummock,
960 clean_watermark_index_in_pk: None,
961 }
962 );
963 assert_eq!(table, TableCatalog::from(table.to_prost()));
964 }
965}