risingwave_frontend/optimizer/rule/
apply_offset_rewriter.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16
17use crate::expr::{CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, InputRef};
18use crate::utils::ColIndexMapping;
19
20/// Convert `CorrelatedInputRef` to `InputRef` and shift `InputRef` with offset.
21pub struct ApplyOffsetRewriter {
22    offset: usize,
23    index_mapping: ColIndexMapping,
24    has_correlated_input_ref: bool,
25    correlated_id: CorrelatedId,
26}
27
28impl ExprRewriter for ApplyOffsetRewriter {
29    fn rewrite_correlated_input_ref(
30        &mut self,
31        correlated_input_ref: CorrelatedInputRef,
32    ) -> ExprImpl {
33        let found = correlated_input_ref.correlated_id() == self.correlated_id;
34        self.has_correlated_input_ref |= found;
35        if found {
36            InputRef::new(
37                self.index_mapping.map(correlated_input_ref.index()),
38                correlated_input_ref.return_type(),
39            )
40            .into()
41        } else {
42            correlated_input_ref.into()
43        }
44    }
45
46    fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
47        InputRef::new(input_ref.index() + self.offset, input_ref.return_type()).into()
48    }
49}
50
51impl ApplyOffsetRewriter {
52    pub fn new(offset: usize, correlated_indices: &[usize], correlated_id: CorrelatedId) -> Self {
53        Self {
54            offset,
55            index_mapping: ApplyCorrelatedIndicesConverter::convert_to_index_mapping(
56                correlated_indices,
57            ),
58            has_correlated_input_ref: false,
59            correlated_id,
60        }
61    }
62
63    pub fn has_correlated_input_ref(&self) -> bool {
64        self.has_correlated_input_ref
65    }
66
67    pub fn reset_state(&mut self) {
68        self.has_correlated_input_ref = false;
69    }
70}
71
72pub struct ApplyCorrelatedIndicesConverter {}
73
74impl ApplyCorrelatedIndicesConverter {
75    pub fn convert_to_index_mapping(correlated_indices: &[usize]) -> ColIndexMapping {
76        // Inverse anyway.
77        let target_size = match correlated_indices.iter().max_by_key(|&&x| x) {
78            Some(target_max) => target_max + 1,
79            None => 0,
80        };
81        let col_mapping = ColIndexMapping::new(
82            correlated_indices.iter().copied().map(Some).collect_vec(),
83            target_size,
84        );
85        let mut map = vec![None; col_mapping.target_size()];
86        for (src, dst) in col_mapping.mapping_pairs() {
87            map[dst] = Some(src);
88        }
89        ColIndexMapping::new(map, col_mapping.source_size())
90    }
91}