risingwave_stream/common/
column_mapping.rsuse std::collections::HashMap;
#[derive(Clone)]
pub struct StateTableColumnMapping {
included_upstream_indices: Vec<usize>,
mapping: HashMap<usize, usize>,
}
impl StateTableColumnMapping {
pub fn new(
included_upstream_indices: Vec<usize>,
table_value_indices: Option<Vec<usize>>,
) -> Self {
let mapping = table_value_indices
.unwrap_or_else(|| (0..included_upstream_indices.len()).collect())
.into_iter()
.map(|value_idx| included_upstream_indices[value_idx])
.enumerate()
.map(|(i, upstream_idx)| (upstream_idx, i))
.collect();
Self {
included_upstream_indices,
mapping,
}
}
pub fn upstream_to_state_table(&self, idx: usize) -> Option<usize> {
self.mapping.get(&idx).copied()
}
pub fn upstream_columns(&self) -> &[usize] {
&self.included_upstream_indices
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_column_mapping() {
let mapping = StateTableColumnMapping::new(vec![2, 3, 0, 1], None);
assert_eq!(mapping.upstream_to_state_table(2), Some(0));
assert_eq!(mapping.upstream_to_state_table(3), Some(1));
assert_eq!(mapping.upstream_to_state_table(0), Some(2));
assert_eq!(mapping.upstream_to_state_table(1), Some(3));
assert_eq!(mapping.upstream_to_state_table(4), None);
assert_eq!(mapping.upstream_columns(), &[2, 3, 0, 1]);
}
#[test]
fn test_column_mapping_with_value_indices() {
let mapping = StateTableColumnMapping::new(vec![2, 3, 0, 1], Some(vec![0, 1, 3]));
assert_eq!(mapping.upstream_to_state_table(2), Some(0));
assert_eq!(mapping.upstream_to_state_table(3), Some(1));
assert_eq!(mapping.upstream_to_state_table(0), None); assert_eq!(mapping.upstream_to_state_table(1), Some(2));
assert_eq!(mapping.upstream_to_state_table(4), None);
assert_eq!(mapping.upstream_columns(), &[2, 3, 0, 1]);
}
}