risingwave_stream/common/
column_mapping.rs1use std::collections::HashMap;
16
17#[derive(Clone)]
18pub struct StateTableColumnMapping {
19 included_upstream_indices: Vec<usize>,
21 mapping: HashMap<usize, usize>,
23}
24
25impl StateTableColumnMapping {
26 pub fn new(
28 included_upstream_indices: Vec<usize>,
29 table_value_indices: Option<Vec<usize>>,
30 ) -> Self {
31 let mapping = table_value_indices
32 .unwrap_or_else(|| (0..included_upstream_indices.len()).collect())
33 .into_iter()
34 .map(|value_idx| included_upstream_indices[value_idx])
35 .enumerate()
36 .map(|(i, upstream_idx)| (upstream_idx, i))
37 .collect();
38 Self {
39 included_upstream_indices,
40 mapping,
41 }
42 }
43
44 pub fn upstream_to_state_table(&self, idx: usize) -> Option<usize> {
46 self.mapping.get(&idx).copied()
47 }
48
49 pub fn upstream_columns(&self) -> &[usize] {
51 &self.included_upstream_indices
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58
59 #[test]
60 fn test_column_mapping() {
61 let mapping = StateTableColumnMapping::new(vec![2, 3, 0, 1], None);
62 assert_eq!(mapping.upstream_to_state_table(2), Some(0));
63 assert_eq!(mapping.upstream_to_state_table(3), Some(1));
64 assert_eq!(mapping.upstream_to_state_table(0), Some(2));
65 assert_eq!(mapping.upstream_to_state_table(1), Some(3));
66 assert_eq!(mapping.upstream_to_state_table(4), None);
67 assert_eq!(mapping.upstream_columns(), &[2, 3, 0, 1]);
68 }
69
70 #[test]
71 fn test_column_mapping_with_value_indices() {
72 let mapping = StateTableColumnMapping::new(vec![2, 3, 0, 1], Some(vec![0, 1, 3]));
73 assert_eq!(mapping.upstream_to_state_table(2), Some(0));
74 assert_eq!(mapping.upstream_to_state_table(3), Some(1));
75 assert_eq!(mapping.upstream_to_state_table(0), None); assert_eq!(mapping.upstream_to_state_table(1), Some(2));
77 assert_eq!(mapping.upstream_to_state_table(4), None);
78 assert_eq!(mapping.upstream_columns(), &[2, 3, 0, 1]);
79 }
80}