risingwave_common/hash/
table_distribution.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::replace;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_pb::plan_common::StorageTableDesc;
20
21use crate::array::{Array, DataChunk, PrimitiveArray};
22use crate::bitmap::Bitmap;
23use crate::hash::{VirtualNode, VnodeCountCompat};
24use crate::row::Row;
25use crate::util::iter_util::ZipEqFast;
26
27/// For tables without distribution (singleton), the `SINGLETON_VNODE` is encoded.
28pub const SINGLETON_VNODE: VirtualNode = VirtualNode::ZERO;
29
30use super::VnodeBitmapExt;
31
32#[derive(Debug, Clone)]
33enum ComputeVnode {
34    Singleton,
35    DistKeyIndices {
36        /// Virtual nodes that the table is partitioned into.
37        vnodes: Arc<Bitmap>,
38        /// Indices of distribution key for computing vnode, based on the pk columns of the table.
39        dist_key_in_pk_indices: Vec<usize>,
40    },
41    VnodeColumnIndex {
42        /// Virtual nodes that the table is partitioned into.
43        vnodes: Arc<Bitmap>,
44        /// Index of vnode column.
45        vnode_col_idx_in_pk: usize,
46    },
47}
48
49#[derive(Debug, Clone)]
50/// Represents the distribution for a specific table instance.
51pub struct TableDistribution {
52    /// The way to compute vnode provided primary key
53    compute_vnode: ComputeVnode,
54}
55
56impl TableDistribution {
57    pub fn new_from_storage_table_desc(
58        vnodes: Option<Arc<Bitmap>>,
59        table_desc: &StorageTableDesc,
60    ) -> Self {
61        let dist_key_in_pk_indices = table_desc
62            .dist_key_in_pk_indices
63            .iter()
64            .map(|&k| k as usize)
65            .collect_vec();
66        let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize);
67
68        let this = Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
69        assert_eq!(
70            this.vnode_count(),
71            table_desc.vnode_count(),
72            "vnode count mismatch, scanning table {} under wrong distribution?",
73            table_desc.table_id
74        );
75        this
76    }
77
78    pub fn new(
79        vnodes: Option<Arc<Bitmap>>,
80        dist_key_in_pk_indices: Vec<usize>,
81        vnode_col_idx_in_pk: Option<usize>,
82    ) -> Self {
83        let compute_vnode = if let Some(vnode_col_idx_in_pk) = vnode_col_idx_in_pk {
84            ComputeVnode::VnodeColumnIndex {
85                vnodes: vnodes.unwrap_or_else(|| Bitmap::singleton_arc().clone()),
86                vnode_col_idx_in_pk,
87            }
88        } else if !dist_key_in_pk_indices.is_empty() {
89            ComputeVnode::DistKeyIndices {
90                vnodes: vnodes.expect("vnodes must be `Some` as dist key indices are set"),
91                dist_key_in_pk_indices,
92            }
93        } else {
94            ComputeVnode::Singleton
95        };
96
97        Self { compute_vnode }
98    }
99
100    pub fn is_singleton(&self) -> bool {
101        matches!(&self.compute_vnode, ComputeVnode::Singleton)
102    }
103
104    /// Distribution that accesses all vnodes, mainly used for tests.
105    pub fn all(dist_key_in_pk_indices: Vec<usize>, vnode_count: usize) -> Self {
106        Self {
107            compute_vnode: ComputeVnode::DistKeyIndices {
108                vnodes: Bitmap::ones(vnode_count).into(),
109                dist_key_in_pk_indices,
110            },
111        }
112    }
113
114    /// Fallback distribution for singleton or tests.
115    pub fn singleton() -> Self {
116        Self {
117            compute_vnode: ComputeVnode::Singleton,
118        }
119    }
120
121    pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
122        match &mut self.compute_vnode {
123            ComputeVnode::Singleton => {
124                if !new_vnodes.is_singleton() {
125                    panic!(
126                        "update vnode bitmap on singleton distribution to non-singleton: {:?}",
127                        new_vnodes
128                    );
129                }
130                self.vnodes().clone() // not updated
131            }
132
133            ComputeVnode::DistKeyIndices { vnodes, .. }
134            | ComputeVnode::VnodeColumnIndex { vnodes, .. } => {
135                assert_eq!(vnodes.len(), new_vnodes.len());
136                replace(vnodes, new_vnodes)
137            }
138        }
139    }
140
141    /// Get vnode bitmap if distributed, or a dummy [`Bitmap::singleton()`] if singleton.
142    pub fn vnodes(&self) -> &Arc<Bitmap> {
143        match &self.compute_vnode {
144            ComputeVnode::DistKeyIndices { vnodes, .. } => vnodes,
145            ComputeVnode::VnodeColumnIndex { vnodes, .. } => vnodes,
146            ComputeVnode::Singleton => Bitmap::singleton_arc(),
147        }
148    }
149
150    /// Get vnode count (1 if singleton). Equivalent to `self.vnodes().len()`.
151    pub fn vnode_count(&self) -> usize {
152        self.vnodes().len()
153    }
154
155    /// Get vnode value with given primary key.
156    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
157        match &self.compute_vnode {
158            ComputeVnode::Singleton => SINGLETON_VNODE,
159            ComputeVnode::DistKeyIndices {
160                vnodes,
161                dist_key_in_pk_indices,
162            } => compute_vnode(pk, dist_key_in_pk_indices, vnodes),
163            ComputeVnode::VnodeColumnIndex {
164                vnodes,
165                vnode_col_idx_in_pk,
166            } => get_vnode_from_row(pk, *vnode_col_idx_in_pk, vnodes),
167        }
168    }
169
170    pub fn try_compute_vnode_by_pk_prefix(&self, pk_prefix: impl Row) -> Option<VirtualNode> {
171        match &self.compute_vnode {
172            ComputeVnode::Singleton => Some(SINGLETON_VNODE),
173            ComputeVnode::DistKeyIndices {
174                vnodes,
175                dist_key_in_pk_indices,
176            } => dist_key_in_pk_indices
177                .iter()
178                .all(|&d| d < pk_prefix.len())
179                .then(|| compute_vnode(pk_prefix, dist_key_in_pk_indices, vnodes)),
180            ComputeVnode::VnodeColumnIndex {
181                vnodes,
182                vnode_col_idx_in_pk,
183            } => {
184                if *vnode_col_idx_in_pk >= pk_prefix.len() {
185                    None
186                } else {
187                    Some(get_vnode_from_row(pk_prefix, *vnode_col_idx_in_pk, vnodes))
188                }
189            }
190        }
191    }
192}
193
194/// Get vnode value with `indices` on the given `row`.
195pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> VirtualNode {
196    assert!(!indices.is_empty());
197    let vnode = VirtualNode::compute_row(&row, indices, vnodes.len());
198    check_vnode_is_set(vnode, vnodes);
199
200    tracing::debug!(target: "events::storage::storage_table", "compute vnode: {:?} key {:?} => {}", row, indices, vnode);
201
202    vnode
203}
204
205pub fn get_vnode_from_row(row: impl Row, index: usize, vnodes: &Bitmap) -> VirtualNode {
206    let vnode = VirtualNode::from_datum(row.datum_at(index));
207    check_vnode_is_set(vnode, vnodes);
208
209    tracing::debug!(target: "events::storage::storage_table", "get vnode from row: {:?} vnode column index {:?} => {}", row, index, vnode);
210
211    vnode
212}
213
214impl TableDistribution {
215    /// Get vnode values with `indices` on the given `chunk`.
216    ///
217    /// Vnode of invisible rows will be included. Only the vnode of visible row check if it's accessible
218    pub fn compute_chunk_vnode(&self, chunk: &DataChunk, pk_indices: &[usize]) -> Vec<VirtualNode> {
219        match &self.compute_vnode {
220            ComputeVnode::Singleton => {
221                vec![SINGLETON_VNODE; chunk.capacity()]
222            }
223            ComputeVnode::DistKeyIndices {
224                vnodes,
225                dist_key_in_pk_indices,
226            } => {
227                let dist_key_indices = dist_key_in_pk_indices
228                    .iter()
229                    .map(|idx| pk_indices[*idx])
230                    .collect_vec();
231
232                VirtualNode::compute_chunk(chunk, &dist_key_indices, vnodes.len())
233                    .into_iter()
234                    .zip_eq_fast(chunk.visibility().iter())
235                    .map(|(vnode, vis)| {
236                        // Ignore the invisible rows.
237                        if vis {
238                            check_vnode_is_set(vnode, vnodes);
239                        }
240                        vnode
241                    })
242                    .collect()
243            }
244            ComputeVnode::VnodeColumnIndex {
245                vnodes,
246                vnode_col_idx_in_pk,
247            } => {
248                let array: &PrimitiveArray<i16> =
249                    chunk.columns()[pk_indices[*vnode_col_idx_in_pk]].as_int16();
250                array
251                    .raw_iter()
252                    .zip_eq_fast(array.null_bitmap().iter())
253                    .zip_eq_fast(chunk.visibility().iter())
254                    .map(|((vnode, exist), vis)| {
255                        let vnode = VirtualNode::from_scalar(vnode);
256                        if vis {
257                            assert!(exist);
258                            check_vnode_is_set(vnode, vnodes);
259                        }
260                        vnode
261                    })
262                    .collect_vec()
263            }
264        }
265    }
266}
267
268/// Check whether the given `vnode` is set in the `vnodes` of this table.
269fn check_vnode_is_set(vnode: VirtualNode, vnodes: &Bitmap) {
270    let is_set = vnodes.is_set(vnode.to_index());
271
272    if !is_set {
273        let high_ranges = vnodes.high_ranges().map(|r| format!("{r:?}")).join(", ");
274        panic!(
275            "vnode {} should not be accessed by this table\nvnode count: {}\nallowed vnodes: {}",
276            vnode,
277            vnodes.len(),
278            high_ranges
279        );
280    }
281}