risingwave_common/catalog/
physical_table.rsuse std::collections::HashMap;
use fixedbitset::FixedBitSet;
use risingwave_pb::catalog::Table;
use risingwave_pb::common::PbColumnOrder;
use risingwave_pb::plan_common::StorageTableDesc;
use super::{ColumnDesc, ColumnId, TableId};
use crate::catalog::get_dist_key_in_pk_indices;
use crate::hash::{VnodeCount, VnodeCountCompat};
use crate::util::sort_util::ColumnOrder;
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
pub struct TableDesc {
pub table_id: TableId,
pub pk: Vec<ColumnOrder>,
pub columns: Vec<ColumnDesc>,
pub distribution_key: Vec<usize>,
pub stream_key: Vec<usize>,
pub vnode_col_index: Option<usize>,
pub append_only: bool,
pub retention_seconds: Option<u32>,
pub value_indices: Vec<usize>,
pub read_prefix_len_hint: usize,
pub watermark_columns: FixedBitSet,
pub vnode_count: usize,
pub versioned: bool,
}
impl TableDesc {
pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
self.pk.iter().map(|x| x.to_protobuf()).collect()
}
pub fn order_column_indices(&self) -> Vec<usize> {
self.pk.iter().map(|col| (col.column_index)).collect()
}
pub fn order_column_ids(&self) -> Vec<ColumnId> {
self.pk
.iter()
.map(|col| self.columns[col.column_index].column_id)
.collect()
}
pub fn try_to_protobuf(&self) -> anyhow::Result<StorageTableDesc> {
let dist_key_indices: Vec<u32> = self.distribution_key.iter().map(|&k| k as u32).collect();
let pk_indices: Vec<u32> = self
.pk
.iter()
.map(|v| v.to_protobuf().column_index)
.collect();
let vnode_col_idx_in_pk = self
.vnode_col_index
.and_then(|vnode_col_index| {
pk_indices
.iter()
.position(|&pk_index| pk_index == vnode_col_index as u32)
})
.map(|i| i as u32);
let dist_key_in_pk_indices = if vnode_col_idx_in_pk.is_none() {
get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices)?
} else {
Vec::new()
};
Ok(StorageTableDesc {
table_id: self.table_id.into(),
columns: self.columns.iter().map(Into::into).collect(),
pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
dist_key_in_pk_indices,
retention_seconds: self.retention_seconds,
value_indices: self.value_indices.iter().map(|&v| v as u32).collect(),
read_prefix_len_hint: self.read_prefix_len_hint as u32,
versioned: self.versioned,
stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
vnode_col_idx_in_pk,
maybe_vnode_count: VnodeCount::set(self.vnode_count).to_protobuf(),
})
}
pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
let mut id_to_idx = HashMap::new();
self.columns.iter().enumerate().for_each(|(idx, c)| {
id_to_idx.insert(c.column_id, idx);
});
id_to_idx
}
pub fn from_pb_table(table: &Table) -> Self {
let vnode_count = table.vnode_count();
Self {
table_id: TableId::new(table.id),
pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(),
columns: table
.columns
.iter()
.map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()))
.collect(),
distribution_key: table.distribution_key.iter().map(|i| *i as _).collect(),
stream_key: table.stream_key.iter().map(|i| *i as _).collect(),
vnode_col_index: table.vnode_col_index.map(|i| i as _),
append_only: table.append_only,
retention_seconds: table.retention_seconds,
value_indices: table.value_indices.iter().map(|i| *i as _).collect(),
read_prefix_len_hint: table.read_prefix_len_hint as _,
watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(),
versioned: table.version.is_some(),
vnode_count,
}
}
}