risingwave_common/catalog/
physical_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use fixedbitset::FixedBitSet;
18use risingwave_pb::catalog::Table;
19use risingwave_pb::common::PbColumnOrder;
20use risingwave_pb::plan_common::StorageTableDesc;
21
22use super::{ColumnDesc, ColumnId, TableId};
23use crate::catalog::get_dist_key_in_pk_indices;
24use crate::hash::{VnodeCount, VnodeCountCompat};
25use crate::util::sort_util::ColumnOrder;
26
27/// Includes necessary information for compute node to access data of the table.
28///
29/// It's a subset of `TableCatalog` in frontend. Refer to `TableCatalog` for more details.
30#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
31pub struct TableDesc {
32    /// Id of the table, to find in storage.
33    pub table_id: TableId,
34    /// The key used to sort in storage.
35    pub pk: Vec<ColumnOrder>,
36    /// All columns in the table, noticed it is NOT sorted by columnId in the vec.
37    pub columns: Vec<ColumnDesc>,
38    /// Distribution keys of this table, which corresponds to the corresponding column of the
39    /// index. e.g., if `distribution_key = [1, 2]`, then `columns[1]` and `columns[2]` are used
40    /// as distribution key.
41    pub distribution_key: Vec<usize>,
42    /// Column indices for primary keys.
43    pub stream_key: Vec<usize>,
44
45    pub vnode_col_index: Option<usize>,
46
47    /// Whether the table source is append-only
48    pub append_only: bool,
49
50    // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
51    pub retention_seconds: Option<u32>,
52
53    pub value_indices: Vec<usize>,
54
55    /// The prefix len of pk, used in bloom filter.
56    pub read_prefix_len_hint: usize,
57
58    /// the column indices which could receive watermarks.
59    pub watermark_columns: FixedBitSet,
60
61    /// Total vnode count of the table.
62    pub vnode_count: usize,
63
64    /// Whether the table is versioned. If `true`, column-aware row encoding will be used
65    /// to be compatible with schema changes.
66    ///
67    /// See `version` field in `TableCatalog` for more details.
68    pub versioned: bool,
69}
70
71impl TableDesc {
72    pub fn arrange_key_orders_protobuf(&self) -> Vec<PbColumnOrder> {
73        // Set materialize key as arrange key + pk
74        self.pk.iter().map(|x| x.to_protobuf()).collect()
75    }
76
77    pub fn order_column_indices(&self) -> Vec<usize> {
78        self.pk.iter().map(|col| (col.column_index)).collect()
79    }
80
81    pub fn order_column_ids(&self) -> Vec<ColumnId> {
82        self.pk
83            .iter()
84            .map(|col| self.columns[col.column_index].column_id)
85            .collect()
86    }
87
88    pub fn try_to_protobuf(&self) -> anyhow::Result<StorageTableDesc> {
89        let dist_key_indices: Vec<u32> = self.distribution_key.iter().map(|&k| k as u32).collect();
90        let pk_indices: Vec<u32> = self
91            .pk
92            .iter()
93            .map(|v| v.to_protobuf().column_index)
94            .collect();
95        let vnode_col_idx_in_pk = self
96            .vnode_col_index
97            .and_then(|vnode_col_index| {
98                pk_indices
99                    .iter()
100                    .position(|&pk_index| pk_index == vnode_col_index as u32)
101            })
102            .map(|i| i as u32);
103
104        let dist_key_in_pk_indices = if vnode_col_idx_in_pk.is_none() {
105            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices)?
106        } else {
107            Vec::new()
108        };
109        Ok(StorageTableDesc {
110            table_id: self.table_id.into(),
111            columns: self.columns.iter().map(Into::into).collect(),
112            pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
113            dist_key_in_pk_indices,
114            retention_seconds: self.retention_seconds,
115            value_indices: self.value_indices.iter().map(|&v| v as u32).collect(),
116            read_prefix_len_hint: self.read_prefix_len_hint as u32,
117            versioned: self.versioned,
118            stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
119            vnode_col_idx_in_pk,
120            maybe_vnode_count: VnodeCount::set(self.vnode_count).to_protobuf(),
121        })
122    }
123
124    /// Helper function to create a mapping from `column id` to `column index`
125    pub fn get_id_to_op_idx_mapping(&self) -> HashMap<ColumnId, usize> {
126        let mut id_to_idx = HashMap::new();
127        self.columns.iter().enumerate().for_each(|(idx, c)| {
128            id_to_idx.insert(c.column_id, idx);
129        });
130        id_to_idx
131    }
132
133    pub fn from_pb_table(table: &Table) -> Self {
134        let vnode_count = table.vnode_count();
135
136        Self {
137            table_id: TableId::new(table.id),
138            pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(),
139            columns: table
140                .columns
141                .iter()
142                .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()))
143                .collect(),
144            distribution_key: table.distribution_key.iter().map(|i| *i as _).collect(),
145            stream_key: table.stream_key.iter().map(|i| *i as _).collect(),
146            vnode_col_index: table.vnode_col_index.map(|i| i as _),
147            append_only: table.append_only,
148            retention_seconds: table.retention_seconds,
149            value_indices: table.value_indices.iter().map(|i| *i as _).collect(),
150            read_prefix_len_hint: table.read_prefix_len_hint as _,
151            watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(),
152            versioned: table.version.is_some(),
153            vnode_count,
154        }
155    }
156}