risingwave_stream/common/
column_mapping.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17#[derive(Clone)]
18pub struct StateTableColumnMapping {
19    /// index: state table column index, value: upstream column index
20    included_upstream_indices: Vec<usize>,
21    /// key: upstream column index, value: state table value index
22    mapping: HashMap<usize, usize>,
23}
24
25impl StateTableColumnMapping {
26    /// Creates a new column mapping with the upstream columns included in state table.
27    pub fn new(
28        included_upstream_indices: Vec<usize>,
29        table_value_indices: Option<Vec<usize>>,
30    ) -> Self {
31        let mapping = table_value_indices
32            .unwrap_or_else(|| (0..included_upstream_indices.len()).collect())
33            .into_iter()
34            .map(|value_idx| included_upstream_indices[value_idx])
35            .enumerate()
36            .map(|(i, upstream_idx)| (upstream_idx, i))
37            .collect();
38        Self {
39            included_upstream_indices,
40            mapping,
41        }
42    }
43
44    /// Convert upstream chunk column index to state table column index.
45    pub fn upstream_to_state_table(&self, idx: usize) -> Option<usize> {
46        self.mapping.get(&idx).copied()
47    }
48
49    /// Return slice of all upstream columns.
50    pub fn upstream_columns(&self) -> &[usize] {
51        &self.included_upstream_indices
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58
59    #[test]
60    fn test_column_mapping() {
61        let mapping = StateTableColumnMapping::new(vec![2, 3, 0, 1], None);
62        assert_eq!(mapping.upstream_to_state_table(2), Some(0));
63        assert_eq!(mapping.upstream_to_state_table(3), Some(1));
64        assert_eq!(mapping.upstream_to_state_table(0), Some(2));
65        assert_eq!(mapping.upstream_to_state_table(1), Some(3));
66        assert_eq!(mapping.upstream_to_state_table(4), None);
67        assert_eq!(mapping.upstream_columns(), &[2, 3, 0, 1]);
68    }
69
70    #[test]
71    fn test_column_mapping_with_value_indices() {
72        let mapping = StateTableColumnMapping::new(vec![2, 3, 0, 1], Some(vec![0, 1, 3]));
73        assert_eq!(mapping.upstream_to_state_table(2), Some(0));
74        assert_eq!(mapping.upstream_to_state_table(3), Some(1));
75        assert_eq!(mapping.upstream_to_state_table(0), None); // not in value indices
76        assert_eq!(mapping.upstream_to_state_table(1), Some(2));
77        assert_eq!(mapping.upstream_to_state_table(4), None);
78        assert_eq!(mapping.upstream_columns(), &[2, 3, 0, 1]);
79    }
80}