risingwave_common/util/
column_index_mapping.rs1use std::cmp::max;
16use std::fmt::Debug;
17use std::vec;
18
19use itertools::Itertools;
20use risingwave_pb::catalog::PbColIndexMapping;
21
22#[derive(Clone, PartialEq, Eq, Hash)]
26pub struct ColIndexMapping {
27 target_size: usize,
29 map: Vec<Option<usize>>,
31}
32
33impl ColIndexMapping {
34 pub fn new(map: Vec<Option<usize>>, target_size: usize) -> Self {
37 if let Some(target_max) = map.iter().filter_map(|x| *x).max_by_key(|x| *x) {
38 assert!(
39 target_max < target_size,
40 "target_max: {}, target_size: {}",
41 target_max,
42 target_size
43 );
44 };
45 Self { target_size, map }
46 }
47
48 pub fn into_parts(self) -> (Vec<Option<usize>>, usize) {
49 (self.map, self.target_size)
50 }
51
52 pub fn to_parts(&self) -> (&[Option<usize>], usize) {
53 (&self.map, self.target_size)
54 }
55
56 pub fn put(&mut self, src: usize, tar: Option<usize>) {
57 assert!(src < self.source_size());
58 if let Some(tar) = tar {
59 assert!(tar < self.target_size());
60 }
61 self.map[src] = tar;
62 }
63
64 pub fn identity(size: usize) -> Self {
65 let map = (0..size).map(Some).collect();
66 Self::new(map, size)
67 }
68
69 pub fn is_identity(&self) -> bool {
70 if self.map.len() != self.target_size {
71 return false;
72 }
73 for (src, tar) in self.map.iter().enumerate() {
74 if let Some(tar_value) = tar
75 && src == *tar_value
76 {
77 continue;
78 } else {
79 return false;
80 }
81 }
82 true
83 }
84
85 pub fn identity_or_none(source_size: usize, target_size: usize) -> Self {
86 let map = (0..source_size)
87 .map(|i| if i < target_size { Some(i) } else { None })
88 .collect();
89 Self::new(map, target_size)
90 }
91
92 pub fn empty(source_size: usize, target_size: usize) -> Self {
93 let map = vec![None; source_size];
94 Self::new(map, target_size)
95 }
96
97 pub fn with_shift_offset(source_num: usize, offset: isize) -> Self {
126 let map = (0..source_num)
127 .map(|source| {
128 let target = source as isize + offset;
129 usize::try_from(target).ok()
130 })
131 .collect_vec();
132 let target_size = usize::try_from(source_num as isize + offset).unwrap();
133 Self::new(map, target_size)
134 }
135
136 pub fn with_remaining_columns(cols: &[usize], src_size: usize) -> Self {
154 let mut map = vec![None; src_size];
155 for (tar, &src) in cols.iter().enumerate() {
156 map[src] = Some(tar);
157 }
158 Self::new(map, cols.len())
159 }
160
161 pub fn with_included_columns(cols: &[usize], src_size: usize) -> Self {
163 let mut map = vec![None; src_size];
164 for (tar, &src) in cols.iter().enumerate() {
165 if map[src].is_none() {
166 map[src] = Some(tar);
167 }
168 }
169 Self::new(map, cols.len())
170 }
171
172 pub fn with_removed_columns(cols: &[usize], src_size: usize) -> Self {
189 let cols = (0..src_size).filter(|x| !cols.contains(x)).collect_vec();
190 Self::with_remaining_columns(&cols, src_size)
191 }
192
193 #[must_use]
194 pub fn composite(&self, following: &Self) -> Self {
199 let mut map = self.map.clone();
201 for target in &mut map {
202 *target = target.and_then(|index| following.try_map(index));
203 }
204 Self::new(map, following.target_size())
205 }
206
207 pub fn clone_with_offset(&self, offset: usize) -> Self {
208 let mut map = self.map.clone();
209 for target in &mut map {
210 *target = target.and_then(|index| index.checked_add(offset));
211 }
212 Self::new(map, self.target_size() + offset)
213 }
214
215 #[must_use]
222 pub fn union(&self, other: &Self) -> Self {
223 let target_size = max(self.target_size(), other.target_size());
225 let source_size = max(self.source_size(), other.source_size());
226 let mut map = vec![None; source_size];
227 for (src, dst) in self.mapping_pairs() {
228 assert_eq!(map[src], None);
229 map[src] = Some(dst);
230 }
231 for (src, dst) in other.mapping_pairs() {
232 assert_eq!(map[src], None);
233 map[src] = Some(dst);
234 }
235 Self::new(map, target_size)
236 }
237
238 #[must_use]
240 pub fn inverse(&self) -> Option<Self> {
241 let mut map = vec![None; self.target_size()];
242 for (src, dst) in self.mapping_pairs() {
243 if map[dst].is_some() {
244 return None;
245 }
246 map[dst] = Some(src);
247 }
248 Some(Self::new(map, self.source_size()))
249 }
250
251 pub fn mapping_pairs(&self) -> impl Iterator<Item = (usize, usize)> + '_ {
253 self.map
254 .iter()
255 .cloned()
256 .enumerate()
257 .filter_map(|(src, tar)| tar.map(|tar| (src, tar)))
258 }
259
260 pub fn try_map(&self, index: usize) -> Option<usize> {
262 *self.map.get(index)?
263 }
264
265 pub fn try_map_all(&self, indices: impl IntoIterator<Item = usize>) -> Option<Vec<usize>> {
268 indices.into_iter().map(|i| self.try_map(i)).collect()
269 }
270
271 pub fn map(&self, index: usize) -> usize {
275 self.try_map(index).unwrap()
276 }
277
278 pub fn target_size(&self) -> usize {
280 self.target_size
281 }
282
283 pub fn source_size(&self) -> usize {
285 self.map.len()
286 }
287
288 pub fn is_empty(&self) -> bool {
289 self.target_size() == 0
290 }
291
292 pub fn is_injective(&self) -> bool {
293 let mut tar_exists = vec![false; self.target_size()];
294 for i in self.map.iter().flatten() {
295 if tar_exists[*i] {
296 return false;
297 }
298 tar_exists[*i] = true;
299 }
300 true
301 }
302}
303
304impl ColIndexMapping {
305 pub fn to_protobuf(&self) -> PbColIndexMapping {
306 PbColIndexMapping {
307 target_size: self.target_size as u64,
308 map: self
309 .map
310 .iter()
311 .map(|x| x.map_or(-1, |x| x as i64))
312 .collect(),
313 }
314 }
315
316 pub fn from_protobuf(prost: &PbColIndexMapping) -> ColIndexMapping {
317 ColIndexMapping {
318 target_size: prost.target_size as usize,
319 map: prost.map.iter().map(|&x| x.try_into().ok()).collect(),
320 }
321 }
322}
323
324impl Debug for ColIndexMapping {
325 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326 write!(
327 f,
328 "ColIndexMapping(source_size:{}, target_size:{}, mapping:{})",
329 self.source_size(),
330 self.target_size(),
331 self.mapping_pairs()
332 .map(|(src, dst)| format!("{}->{}", src, dst))
333 .join(",")
334 )
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341
342 #[test]
343 fn test_shift_0() {
344 let mapping = ColIndexMapping::with_shift_offset(3, 0);
345 assert_eq!(mapping.map(0), 0);
346 assert_eq!(mapping.map(1), 1);
347 assert_eq!(mapping.map(2), 2);
348 assert_eq!(mapping.try_map(3), None);
349 assert_eq!(mapping.try_map(4), None);
350 }
351
352 #[test]
353 fn test_shift_0_source() {
354 let mapping = ColIndexMapping::with_shift_offset(0, 3);
355 assert_eq!(mapping.target_size(), 3);
356 }
357
358 #[test]
359 fn test_composite() {
360 let add_mapping = ColIndexMapping::with_shift_offset(3, 3);
361 let remaining_cols = vec![3, 5];
362 let col_prune_mapping = ColIndexMapping::with_remaining_columns(&remaining_cols, 6);
363 let composite = add_mapping.composite(&col_prune_mapping);
364 assert_eq!(composite.map(0), 0); assert_eq!(composite.try_map(1), None);
366 assert_eq!(composite.map(2), 1); }
368
369 #[test]
370 fn test_identity() {
371 let mapping = ColIndexMapping::identity(10);
372 assert!(mapping.is_identity());
373 }
374}