risingwave_frontend/optimizer/property/
watermark_columns.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_common::util::column_index_mapping::ColIndexMapping;
18
19use crate::utils::IndexSet;
20
21pub type WatermarkGroupId = u32;
22
23/// Represents the output watermark columns of a plan node.
24#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
25pub struct WatermarkColumns {
26    col_idx_to_wtmk_group_id: BTreeMap<usize, WatermarkGroupId>,
27}
28
29impl WatermarkColumns {
30    /// Create an empty `WatermarkColumns`.
31    pub fn new() -> Self {
32        Self::default()
33    }
34
35    /// Check if the `WatermarkColumns` is empty.
36    pub fn is_empty(&self) -> bool {
37        self.col_idx_to_wtmk_group_id.is_empty()
38    }
39
40    /// Get the number of watermark columns.
41    pub fn n_indices(&self) -> usize {
42        self.col_idx_to_wtmk_group_id.len()
43    }
44
45    /// Insert a column index with a watermark group ID.
46    pub fn insert(&mut self, col_idx: usize, group: WatermarkGroupId) {
47        self.col_idx_to_wtmk_group_id.insert(col_idx, group);
48    }
49
50    /// Check if the `WatermarkColumns` contains a column index.
51    pub fn contains(&self, col_idx: usize) -> bool {
52        self.col_idx_to_wtmk_group_id.contains_key(&col_idx)
53    }
54
55    /// Get the watermark group ID of a column index.
56    pub fn get_group(&self, col_idx: usize) -> Option<WatermarkGroupId> {
57        self.col_idx_to_wtmk_group_id.get(&col_idx).copied()
58    }
59
60    /// Get all watermark columns as a `IndexSet`.
61    pub fn index_set(&self) -> IndexSet {
62        self.col_idx_to_wtmk_group_id.keys().copied().collect()
63    }
64
65    /// Iterate over all column indices, in ascending order.
66    pub fn indices(&self) -> impl Iterator<Item = usize> + '_ {
67        self.col_idx_to_wtmk_group_id.keys().copied()
68    }
69
70    /// Get all watermark groups and their corresponding column indices.
71    pub fn grouped(&self) -> BTreeMap<WatermarkGroupId, IndexSet> {
72        let mut groups = BTreeMap::new();
73        for (col_idx, group_id) in &self.col_idx_to_wtmk_group_id {
74            groups
75                .entry(*group_id)
76                .or_insert_with(IndexSet::empty)
77                .insert(*col_idx);
78        }
79        groups
80    }
81
82    /// Iterate over all column indices and their corresponding watermark group IDs, in ascending order.
83    pub fn iter(&self) -> impl Iterator<Item = (usize, WatermarkGroupId)> + '_ {
84        self.col_idx_to_wtmk_group_id
85            .iter()
86            .map(|(&col_idx, &group_id)| (col_idx, group_id))
87    }
88
89    /// Clone and shift the column indices to the right (larger) by `column_shift`.
90    pub fn right_shift_clone(&self, column_shift: usize) -> Self {
91        let col_idx_to_wtmk_group_id = self
92            .col_idx_to_wtmk_group_id
93            .iter()
94            .map(|(&col_idx, &group_id)| (col_idx + column_shift, group_id))
95            .collect();
96        Self {
97            col_idx_to_wtmk_group_id,
98        }
99    }
100
101    /// Clone and retain only the columns with indices in `col_indices`.
102    pub fn retain_clone(&self, col_indices: &[usize]) -> Self {
103        let mut new = Self::new();
104        for &col_idx in col_indices {
105            if let Some(group_id) = self.get_group(col_idx) {
106                new.insert(col_idx, group_id);
107            }
108        }
109        new
110    }
111
112    /// Clone and map the column indices using `col_mapping`.
113    pub fn map_clone(&self, col_mapping: &ColIndexMapping) -> Self {
114        let col_idx_to_wtmk_group_id = self
115            .col_idx_to_wtmk_group_id
116            .iter()
117            .filter_map(|(&col_idx, &group_id)| {
118                col_mapping
119                    .try_map(col_idx)
120                    .map(|new_col_idx| (new_col_idx, group_id))
121            })
122            .collect();
123        Self {
124            col_idx_to_wtmk_group_id,
125        }
126    }
127}