risingwave_frontend/optimizer/property/
watermark_columns.rs1use std::collections::BTreeMap;
16
17use risingwave_common::util::column_index_mapping::ColIndexMapping;
18
19use crate::utils::IndexSet;
20
21pub type WatermarkGroupId = u32;
22
23#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
25pub struct WatermarkColumns {
26 col_idx_to_wtmk_group_id: BTreeMap<usize, WatermarkGroupId>,
27}
28
29impl WatermarkColumns {
30 pub fn new() -> Self {
32 Self::default()
33 }
34
35 pub fn is_empty(&self) -> bool {
37 self.col_idx_to_wtmk_group_id.is_empty()
38 }
39
40 pub fn n_indices(&self) -> usize {
42 self.col_idx_to_wtmk_group_id.len()
43 }
44
45 pub fn insert(&mut self, col_idx: usize, group: WatermarkGroupId) {
47 self.col_idx_to_wtmk_group_id.insert(col_idx, group);
48 }
49
50 pub fn contains(&self, col_idx: usize) -> bool {
52 self.col_idx_to_wtmk_group_id.contains_key(&col_idx)
53 }
54
55 pub fn get_group(&self, col_idx: usize) -> Option<WatermarkGroupId> {
57 self.col_idx_to_wtmk_group_id.get(&col_idx).copied()
58 }
59
60 pub fn index_set(&self) -> IndexSet {
62 self.col_idx_to_wtmk_group_id.keys().copied().collect()
63 }
64
65 pub fn indices(&self) -> impl Iterator<Item = usize> + '_ {
67 self.col_idx_to_wtmk_group_id.keys().copied()
68 }
69
70 pub fn grouped(&self) -> BTreeMap<WatermarkGroupId, IndexSet> {
72 let mut groups = BTreeMap::new();
73 for (col_idx, group_id) in &self.col_idx_to_wtmk_group_id {
74 groups
75 .entry(*group_id)
76 .or_insert_with(IndexSet::empty)
77 .insert(*col_idx);
78 }
79 groups
80 }
81
82 pub fn iter(&self) -> impl Iterator<Item = (usize, WatermarkGroupId)> + '_ {
84 self.col_idx_to_wtmk_group_id
85 .iter()
86 .map(|(&col_idx, &group_id)| (col_idx, group_id))
87 }
88
89 pub fn right_shift_clone(&self, column_shift: usize) -> Self {
91 let col_idx_to_wtmk_group_id = self
92 .col_idx_to_wtmk_group_id
93 .iter()
94 .map(|(&col_idx, &group_id)| (col_idx + column_shift, group_id))
95 .collect();
96 Self {
97 col_idx_to_wtmk_group_id,
98 }
99 }
100
101 pub fn retain_clone(&self, col_indices: &[usize]) -> Self {
103 let mut new = Self::new();
104 for &col_idx in col_indices {
105 if let Some(group_id) = self.get_group(col_idx) {
106 new.insert(col_idx, group_id);
107 }
108 }
109 new
110 }
111
112 pub fn map_clone(&self, col_mapping: &ColIndexMapping) -> Self {
114 let col_idx_to_wtmk_group_id = self
115 .col_idx_to_wtmk_group_id
116 .iter()
117 .filter_map(|(&col_idx, &group_id)| {
118 col_mapping
119 .try_map(col_idx)
120 .map(|new_col_idx| (new_col_idx, group_id))
121 })
122 .collect();
123 Self {
124 col_idx_to_wtmk_group_id,
125 }
126 }
127}