risingwave_frontend/optimizer/rule/
apply_offset_rewriter.rs1use itertools::Itertools;
16
17use crate::expr::{CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, InputRef};
18use crate::utils::ColIndexMapping;
19
20pub struct ApplyOffsetRewriter {
22 offset: usize,
23 index_mapping: ColIndexMapping,
24 has_correlated_input_ref: bool,
25 correlated_id: CorrelatedId,
26}
27
28impl ExprRewriter for ApplyOffsetRewriter {
29 fn rewrite_correlated_input_ref(
30 &mut self,
31 correlated_input_ref: CorrelatedInputRef,
32 ) -> ExprImpl {
33 let found = correlated_input_ref.correlated_id() == self.correlated_id;
34 self.has_correlated_input_ref |= found;
35 if found {
36 InputRef::new(
37 self.index_mapping.map(correlated_input_ref.index()),
38 correlated_input_ref.return_type(),
39 )
40 .into()
41 } else {
42 correlated_input_ref.into()
43 }
44 }
45
46 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
47 InputRef::new(input_ref.index() + self.offset, input_ref.return_type()).into()
48 }
49}
50
51impl ApplyOffsetRewriter {
52 pub fn new(offset: usize, correlated_indices: &[usize], correlated_id: CorrelatedId) -> Self {
53 Self {
54 offset,
55 index_mapping: ApplyCorrelatedIndicesConverter::convert_to_index_mapping(
56 correlated_indices,
57 ),
58 has_correlated_input_ref: false,
59 correlated_id,
60 }
61 }
62
63 pub fn has_correlated_input_ref(&self) -> bool {
64 self.has_correlated_input_ref
65 }
66
67 pub fn reset_state(&mut self) {
68 self.has_correlated_input_ref = false;
69 }
70}
71
72pub struct ApplyCorrelatedIndicesConverter {}
73
74impl ApplyCorrelatedIndicesConverter {
75 pub fn convert_to_index_mapping(correlated_indices: &[usize]) -> ColIndexMapping {
76 let target_size = match correlated_indices.iter().max_by_key(|&&x| x) {
78 Some(target_max) => target_max + 1,
79 None => 0,
80 };
81 let col_mapping = ColIndexMapping::new(
82 correlated_indices.iter().copied().map(Some).collect_vec(),
83 target_size,
84 );
85 let mut map = vec![None; col_mapping.target_size()];
86 for (src, dst) in col_mapping.mapping_pairs() {
87 map[dst] = Some(src);
88 }
89 ColIndexMapping::new(map, col_mapping.source_size())
90 }
91}