risingwave_frontend/expr/
correlated_input_ref.rs1use core::fmt;
16
17use risingwave_common::types::DataType;
18
19use super::{Expr, ExprImpl, InputRef};
20use crate::expr::ExprRewriter;
21
22pub type Depth = usize;
23pub type CorrelatedId = u32;
24
25#[derive(Clone, Eq, PartialEq, Hash)]
29pub enum Position {
30 Relative(Depth),
31 Absolute(CorrelatedId),
32}
33
34#[derive(Clone, Eq, PartialEq, Hash)]
40pub struct CorrelatedInputRef {
41 index: usize,
42 data_type: DataType,
43 position: Position,
44}
45
46impl CorrelatedInputRef {
47 pub fn new(index: usize, data_type: DataType, depth: usize) -> Self {
48 CorrelatedInputRef {
49 index,
50 data_type,
51 position: Position::Relative(depth),
52 }
53 }
54
55 pub fn index(&self) -> usize {
57 self.index
58 }
59
60 pub fn depth(&self) -> usize {
61 match self.position {
62 Position::Relative(depth) => depth,
63 Position::Absolute(_) => 0,
64 }
65 }
66
67 pub fn set_correlated_id(&mut self, correlated_id: CorrelatedId) {
68 self.position = Position::Absolute(correlated_id);
69 }
70
71 pub fn correlated_id(&self) -> CorrelatedId {
72 match self.position {
73 Position::Relative(_) => 0,
74 Position::Absolute(correlated_id) => correlated_id,
75 }
76 }
77}
78
79impl Expr for CorrelatedInputRef {
80 fn return_type(&self) -> DataType {
81 self.data_type.clone()
82 }
83
84 fn try_to_expr_proto(&self) -> Result<risingwave_pb::expr::ExprNode, String> {
85 Err(format!(
86 "CorrelatedInputRef {:?} has not been decorrelated",
87 self
88 ))
89 }
90}
91
92impl fmt::Debug for CorrelatedInputRef {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 f.debug_struct("CorrelatedInputRef")
95 .field("index", &self.index)
96 .field("correlated_id", &self.correlated_id())
97 .finish()
98 }
99}
100
101pub struct InputRefDepthRewriter {
104 offset: usize,
105}
106
107impl InputRefDepthRewriter {
108 pub fn new(offset: usize) -> Self {
109 Self { offset }
110 }
111}
112
113impl ExprRewriter for InputRefDepthRewriter {
114 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
115 if self.offset == 0 {
116 input_ref.into()
117 } else {
118 CorrelatedInputRef::new(input_ref.index(), input_ref.return_type(), self.offset).into()
119 }
120 }
121
122 fn rewrite_correlated_input_ref(
123 &mut self,
124 correlated_input_ref: CorrelatedInputRef,
125 ) -> ExprImpl {
126 CorrelatedInputRef::new(
127 correlated_input_ref.index(),
128 correlated_input_ref.return_type(),
129 correlated_input_ref.depth() + self.offset,
130 )
131 .into()
132 }
133}