risingwave_common/catalog/
physical_table.rs1use std::collections::HashMap;
16
17use fixedbitset::FixedBitSet;
18use risingwave_pb::catalog::Table;
19use risingwave_pb::common::PbColumnOrder;
20use risingwave_pb::plan_common::StorageTableDesc;
21
22use super::{ColumnDesc, ColumnId, TableId};
23use crate::catalog::get_dist_key_in_pk_indices;
24use crate::hash::{VnodeCount, VnodeCountCompat};
25use crate::util::sort_util::ColumnOrder;
26
27#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
31pub struct TableDesc {
32 pub table_id: TableId,
34 pub pk: Vec<ColumnOrder>,
36 pub columns: Vec<ColumnDesc>,
38 pub distribution_key: Vec<usize>,
42 pub stream_key: Vec<usize>,
44
45 pub vnode_col_index: Option<usize>,
46
47 pub append_only: bool,
49
50 pub retention_seconds: Option<u32>,
52
53 pub value_indices: Vec<usize>,
54
55 pub read_prefix_len_hint: usize,
57
58 pub watermark_columns: FixedBitSet,
60
61 pub vnode_count: usize,
63
64 pub versioned: bool,
69}
70
71impl TableDesc {
72 pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
73 self.pk.iter().map(|x| x.to_protobuf()).collect()
75 }
76
77 pub fn order_column_indices(&self) -> Vec<usize> {
78 self.pk.iter().map(|col| (col.column_index)).collect()
79 }
80
81 pub fn order_column_ids(&self) -> Vec<ColumnId> {
82 self.pk
83 .iter()
84 .map(|col| self.columns[col.column_index].column_id)
85 .collect()
86 }
87
88 pub fn try_to_protobuf(&self) -> anyhow::Result<StorageTableDesc> {
89 let dist_key_indices: Vec<u32> = self.distribution_key.iter().map(|&k| k as u32).collect();
90 let pk_indices: Vec<u32> = self
91 .pk
92 .iter()
93 .map(|v| v.to_protobuf().column_index)
94 .collect();
95 let vnode_col_idx_in_pk = self
96 .vnode_col_index
97 .and_then(|vnode_col_index| {
98 pk_indices
99 .iter()
100 .position(|&pk_index| pk_index == vnode_col_index as u32)
101 })
102 .map(|i| i as u32);
103
104 let dist_key_in_pk_indices = if vnode_col_idx_in_pk.is_none() {
105 get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices)?
106 } else {
107 Vec::new()
108 };
109 Ok(StorageTableDesc {
110 table_id: self.table_id.into(),
111 columns: self.columns.iter().map(Into::into).collect(),
112 pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
113 dist_key_in_pk_indices,
114 retention_seconds: self.retention_seconds,
115 value_indices: self.value_indices.iter().map(|&v| v as u32).collect(),
116 read_prefix_len_hint: self.read_prefix_len_hint as u32,
117 versioned: self.versioned,
118 stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
119 vnode_col_idx_in_pk,
120 maybe_vnode_count: VnodeCount::set(self.vnode_count).to_protobuf(),
121 })
122 }
123
124 pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
126 let mut id_to_idx = HashMap::new();
127 self.columns.iter().enumerate().for_each(|(idx, c)| {
128 id_to_idx.insert(c.column_id, idx);
129 });
130 id_to_idx
131 }
132
133 pub fn from_pb_table(table: &Table) -> Self {
134 let vnode_count = table.vnode_count();
135
136 Self {
137 table_id: TableId::new(table.id),
138 pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(),
139 columns: table
140 .columns
141 .iter()
142 .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()))
143 .collect(),
144 distribution_key: table.distribution_key.iter().map(|i| *i as _).collect(),
145 stream_key: table.stream_key.iter().map(|i| *i as _).collect(),
146 vnode_col_index: table.vnode_col_index.map(|i| i as _),
147 append_only: table.append_only,
148 retention_seconds: table.retention_seconds,
149 value_indices: table.value_indices.iter().map(|i| *i as _).collect(),
150 read_prefix_len_hint: table.read_prefix_len_hint as _,
151 watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(),
152 versioned: table.version.is_some(),
153 vnode_count,
154 }
155 }
156}