risingwave_frontend/expr/
correlated_input_ref.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt;
16
17use risingwave_common::types::DataType;
18
19use super::{Expr, ExprImpl, InputRef};
20use crate::expr::ExprRewriter;
21
22pub type Depth = usize;
23pub type CorrelatedId = u32;
24
25/// Relative `Depth` is the number of of nesting levels of the subquery relative to the referred
26/// relation, and should be non-zero.
27/// Absolute `CorrelatedId` is the id of the related Apply operator, and should be non-zero.
28#[derive(Clone, Eq, PartialEq, Hash)]
29pub enum Position {
30    Relative(Depth),
31    Absolute(CorrelatedId),
32}
33
34/// A reference to a column outside the subquery.
35///
36/// `index` is the index in the referred relation.
37/// `position` has two mode Relative and Absolute.
38/// For binding we use relative position, for optimization we use absolute position.
39#[derive(Clone, Eq, PartialEq, Hash)]
40pub struct CorrelatedInputRef {
41    index: usize,
42    data_type: DataType,
43    position: Position,
44}
45
46impl CorrelatedInputRef {
47    pub fn new(index: usize, data_type: DataType, depth: usize) -> Self {
48        CorrelatedInputRef {
49            index,
50            data_type,
51            position: Position::Relative(depth),
52        }
53    }
54
55    /// Get a reference to the input ref's index.
56    pub fn index(&self) -> usize {
57        self.index
58    }
59
60    pub fn depth(&self) -> usize {
61        match self.position {
62            Position::Relative(depth) => depth,
63            Position::Absolute(_) => 0,
64        }
65    }
66
67    pub fn set_correlated_id(&mut self, correlated_id: CorrelatedId) {
68        self.position = Position::Absolute(correlated_id);
69    }
70
71    pub fn correlated_id(&self) -> CorrelatedId {
72        match self.position {
73            Position::Relative(_) => 0,
74            Position::Absolute(correlated_id) => correlated_id,
75        }
76    }
77}
78
79impl Expr for CorrelatedInputRef {
80    fn return_type(&self) -> DataType {
81        self.data_type.clone()
82    }
83
84    fn try_to_expr_proto(&self) -> Result<risingwave_pb::expr::ExprNode, String> {
85        Err(format!(
86            "CorrelatedInputRef {:?} has not been decorrelated",
87            self
88        ))
89    }
90}
91
92impl fmt::Debug for CorrelatedInputRef {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        f.debug_struct("CorrelatedInputRef")
95            .field("index", &self.index)
96            .field("correlated_id", &self.correlated_id())
97            .finish()
98    }
99}
100
101/// Rewrite `InputRef` or `CorrelatedInputRef` to `CorrelatedInputRef` with an offset
102/// on the relative `depth`.
103pub struct InputRefDepthRewriter {
104    offset: usize,
105}
106
107impl InputRefDepthRewriter {
108    pub fn new(offset: usize) -> Self {
109        Self { offset }
110    }
111}
112
113impl ExprRewriter for InputRefDepthRewriter {
114    fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
115        if self.offset == 0 {
116            input_ref.into()
117        } else {
118            CorrelatedInputRef::new(input_ref.index(), input_ref.return_type(), self.offset).into()
119        }
120    }
121
122    fn rewrite_correlated_input_ref(
123        &mut self,
124        correlated_input_ref: CorrelatedInputRef,
125    ) -> ExprImpl {
126        CorrelatedInputRef::new(
127            correlated_input_ref.index(),
128            correlated_input_ref.return_type(),
129            correlated_input_ref.depth() + self.offset,
130        )
131        .into()
132    }
133}