1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17
18use fixedbitset::FixedBitSet;
19use itertools::Itertools;
20use risingwave_common::catalog::{
21 ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, Schema,
22 StreamJobStatus, TableDesc, TableId, TableVersionId,
23};
24use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
25use risingwave_common::util::epoch::Epoch;
26use risingwave_common::util::sort_util::ColumnOrder;
27use risingwave_pb::catalog::table::{
28 OptionalAssociatedSourceId, PbEngine, PbTableType, PbTableVersion,
29};
30use risingwave_pb::catalog::{
31 PbCreateType, PbStreamJobStatus, PbTable, PbVectorIndexInfo, PbWebhookSourceInfo,
32};
33use risingwave_pb::common::PbColumnOrder;
34use risingwave_pb::plan_common::DefaultColumnDesc;
35use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
36use risingwave_sqlparser::ast;
37use risingwave_sqlparser::parser::Parser;
38use thiserror_ext::AsReport as _;
39
40use super::purify::try_purify_table_source_create_sql_ast;
41use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId, SinkId};
42use crate::error::{ErrorCode, Result, RwError};
43use crate::expr::ExprImpl;
44use crate::optimizer::property::Cardinality;
45use crate::session::current::notice_to_user;
46use crate::user::UserId;
47
48#[derive(Clone, Debug, PartialEq, Eq, Hash)]
80#[cfg_attr(test, derive(Default))]
81pub struct TableCatalog {
82 pub id: TableId,
83
84 pub schema_id: SchemaId,
85
86 pub database_id: DatabaseId,
87
88 pub associated_source_id: Option<TableId>, pub name: String,
91
92 pub columns: Vec<ColumnCatalog>,
94
95 pub pk: Vec<ColumnOrder>,
97
98 pub stream_key: Vec<usize>,
100
101 pub table_type: TableType,
104
105 pub distribution_key: Vec<usize>,
107
108 pub append_only: bool,
111
112 pub cardinality: Cardinality,
114
115 pub owner: UserId,
117
118 pub retention_seconds: Option<u32>,
120
121 pub fragment_id: FragmentId,
123
124 pub dml_fragment_id: Option<FragmentId>,
126
127 pub vnode_col_index: Option<usize>,
130
131 pub row_id_index: Option<usize>,
134
135 pub value_indices: Vec<usize>,
137
138 pub definition: String,
140
141 pub conflict_behavior: ConflictBehavior,
145
146 pub version_column_index: Option<usize>,
147
148 pub read_prefix_len_hint: usize,
149
150 pub version: Option<TableVersion>,
152
153 pub watermark_columns: FixedBitSet,
155
156 pub dist_key_in_pk: Vec<usize>,
159
160 pub created_at_epoch: Option<Epoch>,
161
162 pub initialized_at_epoch: Option<Epoch>,
163
164 pub cleaned_by_watermark: bool,
166
167 pub create_type: CreateType,
169
170 pub stream_job_status: StreamJobStatus,
173
174 pub description: Option<String>,
176
177 pub incoming_sinks: Vec<SinkId>,
179
180 pub created_at_cluster_version: Option<String>,
181
182 pub initialized_at_cluster_version: Option<String>,
183
184 pub cdc_table_id: Option<String>,
185
186 pub vnode_count: VnodeCount,
195
196 pub webhook_info: Option<PbWebhookSourceInfo>,
197
198 pub job_id: Option<TableId>,
199
200 pub engine: Engine,
201
202 pub clean_watermark_index_in_pk: Option<usize>,
203
204 pub refreshable: bool,
206
207 pub vector_index_info: Option<PbVectorIndexInfo>,
208}
209
210pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
211pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";
212
213#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
214pub enum TableType {
215 Table,
217 MaterializedView,
219 Index,
222 Internal,
224}
225
226#[cfg(test)]
227impl Default for TableType {
228 fn default() -> Self {
229 Self::Table
230 }
231}
232
233impl TableType {
234 fn from_prost(prost: PbTableType) -> Self {
235 match prost {
236 PbTableType::Table => Self::Table,
237 PbTableType::MaterializedView => Self::MaterializedView,
238 PbTableType::Index => Self::Index,
239 PbTableType::Internal => Self::Internal,
240 PbTableType::Unspecified => unreachable!(),
241 }
242 }
243
244 pub(crate) fn to_prost(self) -> PbTableType {
245 match self {
246 Self::Table => PbTableType::Table,
247 Self::MaterializedView => PbTableType::MaterializedView,
248 Self::Index => PbTableType::Index,
249 Self::Internal => PbTableType::Internal,
250 }
251 }
252}
253
254#[derive(Clone, Debug, PartialEq, Eq, Hash)]
256pub struct TableVersion {
257 pub version_id: TableVersionId,
258 pub next_column_id: ColumnId,
259}
260
261impl TableVersion {
262 #[cfg(test)]
264 pub fn new_initial_for_test(max_column_id: ColumnId) -> Self {
265 use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
266
267 Self {
268 version_id: INITIAL_TABLE_VERSION_ID,
269 next_column_id: max_column_id.next(),
270 }
271 }
272
273 pub fn from_prost(prost: PbTableVersion) -> Self {
274 Self {
275 version_id: prost.version,
276 next_column_id: ColumnId::from(prost.next_column_id),
277 }
278 }
279
280 pub fn to_prost(&self) -> PbTableVersion {
281 PbTableVersion {
282 version: self.version_id,
283 next_column_id: self.next_column_id.into(),
284 }
285 }
286}
287
288impl TableCatalog {
289 pub fn create_sql_purified(&self) -> String {
294 self.create_sql_ast_purified()
295 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
296 .unwrap_or_else(|_| self.create_sql())
297 }
298
299 pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
304 if let TableType::Table = self.table_type() {
306 let base = if self.definition.is_empty() {
307 let name = ast::ObjectName(vec![self.name.as_str().into()]);
309 ast::Statement::default_create_table(name)
310 } else {
311 self.create_sql_ast_from_persisted()?
312 };
313
314 match try_purify_table_source_create_sql_ast(
315 base,
316 self.columns(),
317 self.row_id_index,
318 &self.pk_column_ids(),
319 ) {
320 Ok(stmt) => return Ok(stmt),
321 Err(e) => notice_to_user(format!(
322 "error occurred while purifying definition for table \"{}\", \
323 results may be inaccurate: {}",
324 self.name,
325 e.as_report()
326 )),
327 }
328 }
329
330 self.create_sql_ast_from_persisted()
331 }
332}
333
334impl TableCatalog {
335 pub fn id(&self) -> TableId {
337 self.id
338 }
339
340 pub fn with_id(mut self, id: TableId) -> Self {
341 self.id = id;
342 self
343 }
344
345 pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self {
346 self.cleaned_by_watermark = cleaned_by_watermark;
347 self
348 }
349
350 pub fn conflict_behavior(&self) -> ConflictBehavior {
351 self.conflict_behavior
352 }
353
354 pub fn table_type(&self) -> TableType {
355 self.table_type
356 }
357
358 pub fn engine(&self) -> Engine {
359 self.engine
360 }
361
362 pub fn iceberg_source_name(&self) -> Option<String> {
363 match self.engine {
364 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SOURCE_PREFIX, self.name)),
365 Engine::Hummock => None,
366 }
367 }
368
369 pub fn iceberg_sink_name(&self) -> Option<String> {
370 match self.engine {
371 Engine::Iceberg => Some(format!("{}{}", ICEBERG_SINK_PREFIX, self.name)),
372 Engine::Hummock => None,
373 }
374 }
375
376 pub fn is_user_table(&self) -> bool {
377 self.table_type == TableType::Table
378 }
379
380 pub fn is_internal_table(&self) -> bool {
381 self.table_type == TableType::Internal
382 }
383
384 pub fn is_mview(&self) -> bool {
385 self.table_type == TableType::MaterializedView
386 }
387
388 pub fn is_index(&self) -> bool {
389 self.table_type == TableType::Index
390 }
391
392 #[must_use]
394 pub fn bad_drop_error(&self) -> RwError {
395 let msg = match self.table_type {
396 TableType::MaterializedView => {
397 "Use `DROP MATERIALIZED VIEW` to drop a materialized view."
398 }
399 TableType::Index => "Use `DROP INDEX` to drop an index.",
400 TableType::Table => "Use `DROP TABLE` to drop a table.",
401 TableType::Internal => "Internal tables cannot be dropped.",
402 };
403
404 ErrorCode::InvalidInputSyntax(msg.to_owned()).into()
405 }
406
407 #[must_use]
409 pub fn associated_source_id(&self) -> Option<TableId> {
410 self.associated_source_id
411 }
412
413 pub fn has_associated_source(&self) -> bool {
414 self.associated_source_id.is_some()
415 }
416
417 pub fn columns(&self) -> &[ColumnCatalog] {
419 &self.columns
420 }
421
422 pub fn columns_without_rw_timestamp(&self) -> Vec<ColumnCatalog> {
423 self.columns
424 .iter()
425 .filter(|c| !c.is_rw_timestamp_column())
426 .cloned()
427 .collect()
428 }
429
430 pub fn pk(&self) -> &[ColumnOrder] {
432 self.pk.as_ref()
433 }
434
435 pub fn pk_column_ids(&self) -> Vec<ColumnId> {
437 self.pk
438 .iter()
439 .map(|x| self.columns[x.column_index].column_id())
440 .collect()
441 }
442
443 pub fn pk_column_names(&self) -> Vec<&str> {
445 self.pk
446 .iter()
447 .map(|x| self.columns[x.column_index].name())
448 .collect()
449 }
450
451 pub fn table_desc(&self) -> TableDesc {
456 use risingwave_common::catalog::TableOption;
457
458 let table_options = TableOption::new(self.retention_seconds);
459
460 TableDesc {
461 table_id: self.id,
462 pk: self.pk.clone(),
463 stream_key: self.stream_key.clone(),
464 columns: self.columns.iter().map(|c| c.column_desc.clone()).collect(),
465 distribution_key: self.distribution_key.clone(),
466 append_only: self.append_only,
467 retention_seconds: table_options.retention_seconds,
468 value_indices: self.value_indices.clone(),
469 read_prefix_len_hint: self.read_prefix_len_hint,
470 watermark_columns: self.watermark_columns.clone(),
471 versioned: self.version.is_some(),
472 vnode_col_index: self.vnode_col_index,
473 vnode_count: self.vnode_count(),
474 }
475 }
476
477 pub fn name(&self) -> &str {
479 self.name.as_ref()
480 }
481
482 pub fn distribution_key(&self) -> &[usize] {
483 self.distribution_key.as_ref()
484 }
485
486 pub fn to_internal_table_prost(&self) -> PbTable {
487 self.to_prost()
488 }
489
490 pub fn create_sql(&self) -> String {
494 self.create_sql_ast()
495 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
496 .unwrap_or_else(|_| self.definition.clone())
497 }
498
499 pub fn create_sql_ast(&self) -> Result<ast::Statement> {
507 if let TableType::Table = self.table_type()
508 && self.definition.is_empty()
509 {
510 self.create_sql_ast_purified()
512 } else {
513 self.create_sql_ast_from_persisted()
515 }
516 }
517
518 fn create_sql_ast_from_persisted(&self) -> Result<ast::Statement> {
519 Ok(Parser::parse_exactly_one(&self.definition)?)
520 }
521
522 pub fn version(&self) -> Option<&TableVersion> {
524 self.version.as_ref()
525 }
526
527 pub fn version_id(&self) -> Option<TableVersionId> {
529 self.version().map(|v| v.version_id)
530 }
531
532 pub fn vnode_count(&self) -> usize {
536 self.vnode_count.value()
537 }
538
539 pub fn to_prost(&self) -> PbTable {
540 PbTable {
541 id: self.id.table_id,
542 schema_id: self.schema_id,
543 database_id: self.database_id,
544 name: self.name.clone(),
545 columns: self
547 .columns_without_rw_timestamp()
548 .iter()
549 .map(|c| c.to_protobuf())
550 .collect(),
551 pk: self.pk.iter().map(|o| o.to_protobuf()).collect(),
552 stream_key: self.stream_key.iter().map(|x| *x as _).collect(),
553 optional_associated_source_id: self
554 .associated_source_id
555 .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
556 table_type: self.table_type.to_prost() as i32,
557 distribution_key: self
558 .distribution_key
559 .iter()
560 .map(|k| *k as i32)
561 .collect_vec(),
562 append_only: self.append_only,
563 owner: self.owner,
564 fragment_id: self.fragment_id,
565 dml_fragment_id: self.dml_fragment_id,
566 vnode_col_index: self.vnode_col_index.map(|i| i as _),
567 row_id_index: self.row_id_index.map(|i| i as _),
568 value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
569 definition: self.definition.clone(),
570 read_prefix_len_hint: self.read_prefix_len_hint as u32,
571 version: self.version.as_ref().map(TableVersion::to_prost),
572 watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
573 dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
574 handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
575 version_column_index: self.version_column_index.map(|value| value as u32),
576 cardinality: Some(self.cardinality.to_protobuf()),
577 initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
578 created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
579 cleaned_by_watermark: self.cleaned_by_watermark,
580 stream_job_status: self.stream_job_status.to_proto().into(),
581 create_type: self.create_type.to_proto().into(),
582 description: self.description.clone(),
583 incoming_sinks: self.incoming_sinks.clone(),
584 created_at_cluster_version: self.created_at_cluster_version.clone(),
585 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
586 retention_seconds: self.retention_seconds,
587 cdc_table_id: self.cdc_table_id.clone(),
588 maybe_vnode_count: self.vnode_count.to_protobuf(),
589 webhook_info: self.webhook_info.clone(),
590 job_id: self.job_id.map(|id| id.table_id),
591 engine: Some(self.engine.to_protobuf().into()),
592 clean_watermark_index_in_pk: self.clean_watermark_index_in_pk.map(|x| x as i32),
593 refreshable: self.refreshable,
594 vector_index_info: self.vector_index_info,
595 }
596 }
597
598 pub fn columns_to_insert(&self) -> impl Iterator<Item = &ColumnCatalog> {
600 self.columns
601 .iter()
602 .filter(|c| !c.is_hidden() && !c.is_generated())
603 }
604
605 pub fn generated_column_names(&self) -> impl Iterator<Item = &str> {
606 self.columns
607 .iter()
608 .filter(|c| c.is_generated())
609 .map(|c| c.name())
610 }
611
612 pub fn generated_col_idxes(&self) -> impl Iterator<Item = usize> + '_ {
613 self.columns
614 .iter()
615 .enumerate()
616 .filter(|(_, c)| c.is_generated())
617 .map(|(i, _)| i)
618 }
619
620 pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
621 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
622 .columns[col_idx]
623 .column_desc
624 .generated_or_default_column
625 .as_ref()
626 {
627 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
628 .expect("expr in default columns corrupted")
629 } else {
630 ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
631 }
632 }
633
634 pub fn default_column_exprs(columns: &[ColumnCatalog]) -> Vec<ExprImpl> {
635 columns
636 .iter()
637 .map(|c| {
638 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
639 expr,
640 ..
641 })) = c.column_desc.generated_or_default_column.as_ref()
642 {
643 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
644 .expect("expr in default columns corrupted")
645 } else {
646 ExprImpl::literal_null(c.data_type().clone())
647 }
648 })
649 .collect()
650 }
651
652 pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
653 self.columns.iter().enumerate().filter_map(|(i, c)| {
654 if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
655 expr, ..
656 })) = c.column_desc.generated_or_default_column.as_ref()
657 {
658 Some((
659 i,
660 ExprImpl::from_expr_proto(expr.as_ref().unwrap())
661 .expect("expr in default columns corrupted"),
662 ))
663 } else {
664 None
665 }
666 })
667 }
668
669 pub fn has_generated_column(&self) -> bool {
670 self.columns.iter().any(|c| c.is_generated())
671 }
672
673 pub fn has_rw_timestamp_column(&self) -> bool {
674 self.columns.iter().any(|c| c.is_rw_timestamp_column())
675 }
676
677 pub fn column_schema(&self) -> Schema {
678 Schema::new(
679 self.columns
680 .iter()
681 .map(|c| Field::from(&c.column_desc))
682 .collect(),
683 )
684 }
685
686 pub fn is_created(&self) -> bool {
687 self.stream_job_status == StreamJobStatus::Created
688 }
689
690 pub fn is_iceberg_engine_table(&self) -> bool {
691 self.engine == Engine::Iceberg
692 }
693
694 pub fn order_column_indices(&self) -> impl Iterator<Item = usize> + '_ {
695 self.pk.iter().map(|col| col.column_index)
696 }
697
698 pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
699 ColumnDesc::get_id_to_op_idx_mapping(&self.columns, None)
700 }
701
702 pub fn order_column_ids(&self) -> Vec<ColumnId> {
703 self.pk
704 .iter()
705 .map(|col| self.columns[col.column_index].column_desc.column_id)
706 .collect()
707 }
708
709 pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
710 self.pk.iter().map(|x| x.to_protobuf()).collect()
712 }
713}
714
715impl From<PbTable> for TableCatalog {
716 fn from(tb: PbTable) -> Self {
717 let id = tb.id;
718 let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
719 let tb_engine = tb
720 .get_engine()
721 .map(|engine| PbEngine::try_from(*engine).expect("Invalid engine"))
722 .unwrap_or(PbEngine::Hummock);
723 let table_type = tb.get_table_type().unwrap();
724 let stream_job_status = tb
725 .get_stream_job_status()
726 .unwrap_or(PbStreamJobStatus::Created);
727 let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
728 let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
729 OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
730 });
731 let name = tb.name.clone();
732
733 let vnode_count = tb.vnode_count_inner();
734 if let VnodeCount::Placeholder = vnode_count {
735 assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
738 }
739
740 let mut col_names = HashSet::new();
741 let mut col_index: HashMap<i32, usize> = HashMap::new();
742
743 let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
744 let version_column_index = tb.version_column_index.map(|value| value as usize);
745 let mut columns: Vec<ColumnCatalog> =
746 tb.columns.into_iter().map(ColumnCatalog::from).collect();
747 if columns.iter().all(|c| !c.is_rw_timestamp_column()) {
748 columns.push(ColumnCatalog::rw_timestamp_column());
750 }
751 for (idx, catalog) in columns.clone().into_iter().enumerate() {
752 let col_name = catalog.name();
753 if !col_names.insert(col_name.to_owned()) {
754 panic!("duplicated column name {} in table {} ", col_name, tb.name)
755 }
756
757 let col_id = catalog.column_desc.column_id.get_id();
758 col_index.insert(col_id, idx);
759 }
760
761 let pk = tb.pk.iter().map(ColumnOrder::from_protobuf).collect();
762 let mut watermark_columns = FixedBitSet::with_capacity(columns.len());
763 for idx in &tb.watermark_indices {
764 watermark_columns.insert(*idx as _);
765 }
766 let engine = Engine::from_protobuf(&tb_engine);
767
768 Self {
769 id: id.into(),
770 schema_id: tb.schema_id,
771 database_id: tb.database_id,
772 associated_source_id: associated_source_id.map(Into::into),
773 name,
774 pk,
775 columns,
776 table_type: TableType::from_prost(table_type),
777 distribution_key: tb
778 .distribution_key
779 .iter()
780 .map(|k| *k as usize)
781 .collect_vec(),
782 stream_key: tb.stream_key.iter().map(|x| *x as _).collect(),
783 append_only: tb.append_only,
784 owner: tb.owner,
785 fragment_id: tb.fragment_id,
786 dml_fragment_id: tb.dml_fragment_id,
787 vnode_col_index: tb.vnode_col_index.map(|x| x as usize),
788 row_id_index: tb.row_id_index.map(|x| x as usize),
789 value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
790 definition: tb.definition,
791 conflict_behavior,
792 version_column_index,
793 read_prefix_len_hint: tb.read_prefix_len_hint as usize,
794 version: tb.version.map(TableVersion::from_prost),
795 watermark_columns,
796 dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(),
797 cardinality: tb
798 .cardinality
799 .map(|c| Cardinality::from_protobuf(&c))
800 .unwrap_or_else(Cardinality::unknown),
801 created_at_epoch: tb.created_at_epoch.map(Epoch::from),
802 initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
803 cleaned_by_watermark: tb.cleaned_by_watermark,
804 create_type: CreateType::from_proto(create_type),
805 stream_job_status: StreamJobStatus::from_proto(stream_job_status),
806 description: tb.description,
807 incoming_sinks: tb.incoming_sinks.clone(),
808 created_at_cluster_version: tb.created_at_cluster_version.clone(),
809 initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
810 retention_seconds: tb.retention_seconds,
811 cdc_table_id: tb.cdc_table_id,
812 vnode_count,
813 webhook_info: tb.webhook_info,
814 job_id: tb.job_id.map(TableId::from),
815 engine,
816 clean_watermark_index_in_pk: tb.clean_watermark_index_in_pk.map(|x| x as usize),
817 refreshable: tb.refreshable,
818 vector_index_info: tb.vector_index_info,
819 }
820 }
821}
822
823impl From<&PbTable> for TableCatalog {
824 fn from(tb: &PbTable) -> Self {
825 tb.clone().into()
826 }
827}
828
829impl OwnedByUserCatalog for TableCatalog {
830 fn owner(&self) -> UserId {
831 self.owner
832 }
833}
834
835#[cfg(test)]
836mod tests {
837 use risingwave_common::catalog::{ColumnDesc, ColumnId};
838 use risingwave_common::test_prelude::*;
839 use risingwave_common::types::*;
840 use risingwave_common::util::sort_util::OrderType;
841 use risingwave_pb::catalog::table::PbEngine;
842 use risingwave_pb::plan_common::{
843 AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
844 };
845
846 use super::*;
847
848 #[test]
849 fn test_into_table_catalog() {
850 let table: TableCatalog = PbTable {
851 id: 0,
852 schema_id: 0,
853 database_id: 0,
854 name: "test".to_owned(),
855 table_type: PbTableType::Table as i32,
856 columns: vec![
857 ColumnCatalog::row_id_column().to_protobuf(),
858 PbColumnCatalog {
859 column_desc: Some(PbColumnDesc::new(
860 DataType::from(StructType::new([
861 ("address", DataType::Varchar),
862 ("zipcode", DataType::Varchar),
863 ]))
864 .to_protobuf(),
865 "country",
866 1,
867 )),
868 is_hidden: false,
869 },
870 ],
871 pk: vec![ColumnOrder::new(0, OrderType::ascending()).to_protobuf()],
872 stream_key: vec![0],
873 distribution_key: vec![0],
874 optional_associated_source_id: OptionalAssociatedSourceId::AssociatedSourceId(233)
875 .into(),
876 append_only: false,
877 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
878 retention_seconds: Some(300),
879 fragment_id: 0,
880 dml_fragment_id: None,
881 initialized_at_epoch: None,
882 value_indices: vec![0],
883 definition: "".into(),
884 read_prefix_len_hint: 0,
885 vnode_col_index: None,
886 row_id_index: None,
887 version: Some(PbTableVersion {
888 version: 0,
889 next_column_id: 2,
890 }),
891 watermark_indices: vec![],
892 handle_pk_conflict_behavior: 3,
893 dist_key_in_pk: vec![0],
894 cardinality: None,
895 created_at_epoch: None,
896 cleaned_by_watermark: false,
897 stream_job_status: PbStreamJobStatus::Created.into(),
898 create_type: PbCreateType::Foreground.into(),
899 description: Some("description".to_owned()),
900 incoming_sinks: vec![],
901 created_at_cluster_version: None,
902 initialized_at_cluster_version: None,
903 version_column_index: None,
904 cdc_table_id: None,
905 maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
906 webhook_info: None,
907 job_id: None,
908 engine: Some(PbEngine::Hummock as i32),
909 clean_watermark_index_in_pk: None,
910 refreshable: false,
911 vector_index_info: None,
912 }
913 .into();
914
915 assert_eq!(
916 table,
917 TableCatalog {
918 id: TableId::new(0),
919 schema_id: 0,
920 database_id: 0,
921 associated_source_id: Some(TableId::new(233)),
922 name: "test".to_owned(),
923 table_type: TableType::Table,
924 columns: vec![
925 ColumnCatalog::row_id_column(),
926 ColumnCatalog {
927 column_desc: ColumnDesc {
928 data_type: StructType::new(vec![
929 ("address", DataType::Varchar),
930 ("zipcode", DataType::Varchar)
931 ],)
932 .into(),
933 column_id: ColumnId::new(1),
934 name: "country".to_owned(),
935 description: None,
936 generated_or_default_column: None,
937 additional_column: AdditionalColumn { column_type: None },
938 version: ColumnDescVersion::LATEST,
939 system_column: None,
940 nullable: true,
941 },
942 is_hidden: false
943 },
944 ColumnCatalog::rw_timestamp_column(),
945 ],
946 stream_key: vec![0],
947 pk: vec![ColumnOrder::new(0, OrderType::ascending())],
948 distribution_key: vec![0],
949 append_only: false,
950 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
951 retention_seconds: Some(300),
952 fragment_id: 0,
953 dml_fragment_id: None,
954 vnode_col_index: None,
955 row_id_index: None,
956 value_indices: vec![0],
957 definition: "".into(),
958 conflict_behavior: ConflictBehavior::NoCheck,
959 read_prefix_len_hint: 0,
960 version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))),
961 watermark_columns: FixedBitSet::with_capacity(3),
962 dist_key_in_pk: vec![0],
963 cardinality: Cardinality::unknown(),
964 created_at_epoch: None,
965 initialized_at_epoch: None,
966 cleaned_by_watermark: false,
967 stream_job_status: StreamJobStatus::Created,
968 create_type: CreateType::Foreground,
969 description: Some("description".to_owned()),
970 incoming_sinks: vec![],
971 created_at_cluster_version: None,
972 initialized_at_cluster_version: None,
973 version_column_index: None,
974 cdc_table_id: None,
975 vnode_count: VnodeCount::set(233),
976 webhook_info: None,
977 job_id: None,
978 engine: Engine::Hummock,
979 clean_watermark_index_in_pk: None,
980 refreshable: false,
981 vector_index_info: None,
982 }
983 );
984 assert_eq!(table, TableCatalog::from(table.to_prost()));
985 }
986}