1use std::mem::replace;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_pb::plan_common::StorageTableDesc;
20
21use crate::array::{Array, DataChunk, PrimitiveArray};
22use crate::bitmap::Bitmap;
23use crate::hash::{VirtualNode, VnodeCountCompat};
24use crate::row::Row;
25use crate::util::iter_util::ZipEqFast;
26
27pub const SINGLETON_VNODE: VirtualNode = VirtualNode::ZERO;
29
30use super::VnodeBitmapExt;
31
32#[derive(Debug, Clone)]
33enum ComputeVnode {
34 Singleton,
35 DistKeyIndices {
36 vnodes: Arc<Bitmap>,
38 dist_key_in_pk_indices: Vec<usize>,
40 },
41 VnodeColumnIndex {
42 vnodes: Arc<Bitmap>,
44 vnode_col_idx_in_pk: usize,
46 },
47}
48
49#[derive(Debug, Clone)]
50pub struct TableDistribution {
52 compute_vnode: ComputeVnode,
54}
55
56impl TableDistribution {
57 pub fn new_from_storage_table_desc(
58 vnodes: Option<Arc<Bitmap>>,
59 table_desc: &StorageTableDesc,
60 ) -> Self {
61 let dist_key_in_pk_indices = table_desc
62 .dist_key_in_pk_indices
63 .iter()
64 .map(|&k| k as usize)
65 .collect_vec();
66 let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize);
67
68 let this = Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
69 assert_eq!(
70 this.vnode_count(),
71 table_desc.vnode_count(),
72 "vnode count mismatch, scanning table {} under wrong distribution?",
73 table_desc.table_id
74 );
75 this
76 }
77
78 pub fn new(
79 vnodes: Option<Arc<Bitmap>>,
80 dist_key_in_pk_indices: Vec<usize>,
81 vnode_col_idx_in_pk: Option<usize>,
82 ) -> Self {
83 let compute_vnode = if let Some(vnode_col_idx_in_pk) = vnode_col_idx_in_pk {
84 ComputeVnode::VnodeColumnIndex {
85 vnodes: vnodes.unwrap_or_else(|| Bitmap::singleton_arc().clone()),
86 vnode_col_idx_in_pk,
87 }
88 } else if !dist_key_in_pk_indices.is_empty() {
89 ComputeVnode::DistKeyIndices {
90 vnodes: vnodes.expect("vnodes must be `Some` as dist key indices are set"),
91 dist_key_in_pk_indices,
92 }
93 } else {
94 ComputeVnode::Singleton
95 };
96
97 Self { compute_vnode }
98 }
99
100 pub fn is_singleton(&self) -> bool {
101 matches!(&self.compute_vnode, ComputeVnode::Singleton)
102 }
103
104 pub fn all(dist_key_in_pk_indices: Vec<usize>, vnode_count: usize) -> Self {
106 Self {
107 compute_vnode: ComputeVnode::DistKeyIndices {
108 vnodes: Bitmap::ones(vnode_count).into(),
109 dist_key_in_pk_indices,
110 },
111 }
112 }
113
114 pub fn singleton() -> Self {
116 Self {
117 compute_vnode: ComputeVnode::Singleton,
118 }
119 }
120
121 pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
122 match &mut self.compute_vnode {
123 ComputeVnode::Singleton => {
124 if !new_vnodes.is_singleton() {
125 panic!(
126 "update vnode bitmap on singleton distribution to non-singleton: {:?}",
127 new_vnodes
128 );
129 }
130 self.vnodes().clone() }
132
133 ComputeVnode::DistKeyIndices { vnodes, .. }
134 | ComputeVnode::VnodeColumnIndex { vnodes, .. } => {
135 assert_eq!(vnodes.len(), new_vnodes.len());
136 replace(vnodes, new_vnodes)
137 }
138 }
139 }
140
141 pub fn vnodes(&self) -> &Arc<Bitmap> {
143 match &self.compute_vnode {
144 ComputeVnode::DistKeyIndices { vnodes, .. } => vnodes,
145 ComputeVnode::VnodeColumnIndex { vnodes, .. } => vnodes,
146 ComputeVnode::Singleton => Bitmap::singleton_arc(),
147 }
148 }
149
150 pub fn vnode_count(&self) -> usize {
152 self.vnodes().len()
153 }
154
155 pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
157 match &self.compute_vnode {
158 ComputeVnode::Singleton => SINGLETON_VNODE,
159 ComputeVnode::DistKeyIndices {
160 vnodes,
161 dist_key_in_pk_indices,
162 } => compute_vnode(pk, dist_key_in_pk_indices, vnodes),
163 ComputeVnode::VnodeColumnIndex {
164 vnodes,
165 vnode_col_idx_in_pk,
166 } => get_vnode_from_row(pk, *vnode_col_idx_in_pk, vnodes),
167 }
168 }
169
170 pub fn try_compute_vnode_by_pk_prefix(&self, pk_prefix: impl Row) -> Option<VirtualNode> {
171 match &self.compute_vnode {
172 ComputeVnode::Singleton => Some(SINGLETON_VNODE),
173 ComputeVnode::DistKeyIndices {
174 vnodes,
175 dist_key_in_pk_indices,
176 } => dist_key_in_pk_indices
177 .iter()
178 .all(|&d| d < pk_prefix.len())
179 .then(|| compute_vnode(pk_prefix, dist_key_in_pk_indices, vnodes)),
180 ComputeVnode::VnodeColumnIndex {
181 vnodes,
182 vnode_col_idx_in_pk,
183 } => {
184 if *vnode_col_idx_in_pk >= pk_prefix.len() {
185 None
186 } else {
187 Some(get_vnode_from_row(pk_prefix, *vnode_col_idx_in_pk, vnodes))
188 }
189 }
190 }
191 }
192}
193
194pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> VirtualNode {
196 assert!(!indices.is_empty());
197 let vnode = VirtualNode::compute_row(&row, indices, vnodes.len());
198 check_vnode_is_set(vnode, vnodes);
199
200 tracing::debug!(target: "events::storage::storage_table", "compute vnode: {:?} key {:?} => {}", row, indices, vnode);
201
202 vnode
203}
204
205pub fn get_vnode_from_row(row: impl Row, index: usize, vnodes: &Bitmap) -> VirtualNode {
206 let vnode = VirtualNode::from_datum(row.datum_at(index));
207 check_vnode_is_set(vnode, vnodes);
208
209 tracing::debug!(target: "events::storage::storage_table", "get vnode from row: {:?} vnode column index {:?} => {}", row, index, vnode);
210
211 vnode
212}
213
214impl TableDistribution {
215 pub fn compute_chunk_vnode(&self, chunk: &DataChunk, pk_indices: &[usize]) -> Vec<VirtualNode> {
219 match &self.compute_vnode {
220 ComputeVnode::Singleton => {
221 vec![SINGLETON_VNODE; chunk.capacity()]
222 }
223 ComputeVnode::DistKeyIndices {
224 vnodes,
225 dist_key_in_pk_indices,
226 } => {
227 let dist_key_indices = dist_key_in_pk_indices
228 .iter()
229 .map(|idx| pk_indices[*idx])
230 .collect_vec();
231
232 VirtualNode::compute_chunk(chunk, &dist_key_indices, vnodes.len())
233 .into_iter()
234 .zip_eq_fast(chunk.visibility().iter())
235 .map(|(vnode, vis)| {
236 if vis {
238 check_vnode_is_set(vnode, vnodes);
239 }
240 vnode
241 })
242 .collect()
243 }
244 ComputeVnode::VnodeColumnIndex {
245 vnodes,
246 vnode_col_idx_in_pk,
247 } => {
248 let array: &PrimitiveArray<i16> =
249 chunk.columns()[pk_indices[*vnode_col_idx_in_pk]].as_int16();
250 array
251 .raw_iter()
252 .zip_eq_fast(array.null_bitmap().iter())
253 .zip_eq_fast(chunk.visibility().iter())
254 .map(|((vnode, exist), vis)| {
255 let vnode = VirtualNode::from_scalar(vnode);
256 if vis {
257 assert!(exist);
258 check_vnode_is_set(vnode, vnodes);
259 }
260 vnode
261 })
262 .collect_vec()
263 }
264 }
265 }
266}
267
268fn check_vnode_is_set(vnode: VirtualNode, vnodes: &Bitmap) {
270 let is_set = vnodes.is_set(vnode.to_index());
271
272 if !is_set {
273 let high_ranges = vnodes.high_ranges().map(|r| format!("{r:?}")).join(", ");
274 panic!(
275 "vnode {} should not be accessed by this table\nvnode count: {}\nallowed vnodes: {}",
276 vnode,
277 vnodes.len(),
278 high_ranges
279 );
280 }
281}