risingwave_common/catalog/
physical_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fixedbitset::FixedBitSet;
16use risingwave_pb::catalog::Table;
17use risingwave_pb::plan_common::StorageTableDesc;
18
19use super::{ColumnDesc, TableId};
20use crate::catalog::get_dist_key_in_pk_indices;
21use crate::hash::{VnodeCount, VnodeCountCompat};
22use crate::util::sort_util::ColumnOrder;
23
24/// Includes necessary information for compute node to access data of the table.
25///
26/// It's a subset of `TableCatalog` in frontend. Refer to `TableCatalog` for more details.
27#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
28pub struct TableDesc {
29    /// Id of the table, to find in storage.
30    pub table_id: TableId,
31    /// The key used to sort in storage.
32    pub pk: Vec<ColumnOrder>,
33    /// All columns in the table, noticed it is NOT sorted by columnId in the vec.
34    pub columns: Vec<ColumnDesc>,
35    /// Distribution keys of this table, which corresponds to the corresponding column of the
36    /// index. e.g., if `distribution_key = [1, 2]`, then `columns[1]` and `columns[2]` are used
37    /// as distribution key.
38    pub distribution_key: Vec<usize>,
39    /// Column indices for primary keys.
40    pub stream_key: Vec<usize>,
41
42    pub vnode_col_index: Option<usize>,
43
44    /// Whether the table source is append-only
45    pub append_only: bool,
46
47    // TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
48    pub retention_seconds: Option<u32>,
49
50    pub value_indices: Vec<usize>,
51
52    /// The prefix len of pk, used in bloom filter.
53    pub read_prefix_len_hint: usize,
54
55    /// the column indices which could receive watermarks.
56    pub watermark_columns: FixedBitSet,
57
58    /// Total vnode count of the table.
59    pub vnode_count: usize,
60
61    /// Whether the table is versioned. If `true`, column-aware row encoding will be used
62    /// to be compatible with schema changes.
63    ///
64    /// See `version` field in `TableCatalog` for more details.
65    pub versioned: bool,
66}
67
68impl TableDesc {
69    pub fn try_to_protobuf(&self) -> anyhow::Result<StorageTableDesc> {
70        let dist_key_indices: Vec<u32> = self.distribution_key.iter().map(|&k| k as u32).collect();
71        let pk_indices: Vec<u32> = self
72            .pk
73            .iter()
74            .map(|v| v.to_protobuf().column_index)
75            .collect();
76        let vnode_col_idx_in_pk = self
77            .vnode_col_index
78            .and_then(|vnode_col_index| {
79                pk_indices
80                    .iter()
81                    .position(|&pk_index| pk_index == vnode_col_index as u32)
82            })
83            .map(|i| i as u32);
84
85        let dist_key_in_pk_indices = if vnode_col_idx_in_pk.is_none() {
86            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices)?
87        } else {
88            Vec::new()
89        };
90        Ok(StorageTableDesc {
91            table_id: self.table_id.into(),
92            columns: self.columns.iter().map(Into::into).collect(),
93            pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
94            dist_key_in_pk_indices,
95            retention_seconds: self.retention_seconds,
96            value_indices: self.value_indices.iter().map(|&v| v as u32).collect(),
97            read_prefix_len_hint: self.read_prefix_len_hint as u32,
98            versioned: self.versioned,
99            stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
100            vnode_col_idx_in_pk,
101            maybe_vnode_count: VnodeCount::set(self.vnode_count).to_protobuf(),
102        })
103    }
104
105    pub fn from_pb_table(table: &Table) -> Self {
106        let vnode_count = table.vnode_count();
107
108        Self {
109            table_id: TableId::new(table.id),
110            pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(),
111            columns: table
112                .columns
113                .iter()
114                .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()))
115                .collect(),
116            distribution_key: table.distribution_key.iter().map(|i| *i as _).collect(),
117            stream_key: table.stream_key.iter().map(|i| *i as _).collect(),
118            vnode_col_index: table.vnode_col_index.map(|i| i as _),
119            append_only: table.append_only,
120            retention_seconds: table.retention_seconds,
121            value_indices: table.value_indices.iter().map(|i| *i as _).collect(),
122            read_prefix_len_hint: table.read_prefix_len_hint as _,
123            watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(),
124            versioned: table.version.is_some(),
125            vnode_count,
126        }
127    }
128}