risingwave_common/catalog/
physical_table.rs1use fixedbitset::FixedBitSet;
16use risingwave_pb::catalog::Table;
17use risingwave_pb::plan_common::StorageTableDesc;
18
19use super::{ColumnDesc, TableId};
20use crate::catalog::get_dist_key_in_pk_indices;
21use crate::hash::{VnodeCount, VnodeCountCompat};
22use crate::util::sort_util::ColumnOrder;
23
24#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
28pub struct TableDesc {
29 pub table_id: TableId,
31 pub pk: Vec<ColumnOrder>,
33 pub columns: Vec<ColumnDesc>,
35 pub distribution_key: Vec<usize>,
39 pub stream_key: Vec<usize>,
41
42 pub vnode_col_index: Option<usize>,
43
44 pub append_only: bool,
46
47 pub retention_seconds: Option<u32>,
49
50 pub value_indices: Vec<usize>,
51
52 pub read_prefix_len_hint: usize,
54
55 pub watermark_columns: FixedBitSet,
57
58 pub vnode_count: usize,
60
61 pub versioned: bool,
66}
67
68impl TableDesc {
69 pub fn try_to_protobuf(&self) -> anyhow::Result<StorageTableDesc> {
70 let dist_key_indices: Vec<u32> = self.distribution_key.iter().map(|&k| k as u32).collect();
71 let pk_indices: Vec<u32> = self
72 .pk
73 .iter()
74 .map(|v| v.to_protobuf().column_index)
75 .collect();
76 let vnode_col_idx_in_pk = self
77 .vnode_col_index
78 .and_then(|vnode_col_index| {
79 pk_indices
80 .iter()
81 .position(|&pk_index| pk_index == vnode_col_index as u32)
82 })
83 .map(|i| i as u32);
84
85 let dist_key_in_pk_indices = if vnode_col_idx_in_pk.is_none() {
86 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices)?
87 } else {
88 Vec::new()
89 };
90 Ok(StorageTableDesc {
91 table_id: self.table_id.into(),
92 columns: self.columns.iter().map(Into::into).collect(),
93 pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
94 dist_key_in_pk_indices,
95 retention_seconds: self.retention_seconds,
96 value_indices: self.value_indices.iter().map(|&v| v as u32).collect(),
97 read_prefix_len_hint: self.read_prefix_len_hint as u32,
98 versioned: self.versioned,
99 stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
100 vnode_col_idx_in_pk,
101 maybe_vnode_count: VnodeCount::set(self.vnode_count).to_protobuf(),
102 })
103 }
104
105 pub fn from_pb_table(table: &Table) -> Self {
106 let vnode_count = table.vnode_count();
107
108 Self {
109 table_id: TableId::new(table.id),
110 pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(),
111 columns: table
112 .columns
113 .iter()
114 .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()))
115 .collect(),
116 distribution_key: table.distribution_key.iter().map(|i| *i as _).collect(),
117 stream_key: table.stream_key.iter().map(|i| *i as _).collect(),
118 vnode_col_index: table.vnode_col_index.map(|i| i as _),
119 append_only: table.append_only,
120 retention_seconds: table.retention_seconds,
121 value_indices: table.value_indices.iter().map(|i| *i as _).collect(),
122 read_prefix_len_hint: table.read_prefix_len_hint as _,
123 watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(),
124 versioned: table.version.is_some(),
125 vnode_count,
126 }
127 }
128}