risingwave_stream/executor/join/
mod.rs1use risingwave_expr::bail;
16use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinInequalityType};
17
18use crate::error::StreamResult;
19
20pub mod builder;
21pub mod hash_join;
22pub mod join_row_set;
23pub mod row;
24
25pub(crate) type JoinOpPrimitive = bool;
26
27#[allow(non_snake_case, non_upper_case_globals)]
28pub(crate) mod JoinOp {
29 use super::JoinOpPrimitive;
30
31 pub const Insert: JoinOpPrimitive = true;
32 pub const Delete: JoinOpPrimitive = false;
33}
34
35pub type JoinTypePrimitive = u8;
39
40#[allow(non_snake_case, non_upper_case_globals)]
41pub mod JoinType {
42 use super::JoinTypePrimitive;
43 pub const Inner: JoinTypePrimitive = 0;
44 pub const LeftOuter: JoinTypePrimitive = 1;
45 pub const RightOuter: JoinTypePrimitive = 2;
46 pub const FullOuter: JoinTypePrimitive = 3;
47 pub const LeftSemi: JoinTypePrimitive = 4;
48 pub const LeftAnti: JoinTypePrimitive = 5;
49 pub const RightSemi: JoinTypePrimitive = 6;
50 pub const RightAnti: JoinTypePrimitive = 7;
51}
52
53pub type AsOfJoinTypePrimitive = u8;
54
55#[allow(non_snake_case, non_upper_case_globals)]
56pub mod AsOfJoinType {
57 use super::AsOfJoinTypePrimitive;
58 pub const Inner: AsOfJoinTypePrimitive = 0;
59 pub const LeftOuter: AsOfJoinTypePrimitive = 1;
60}
61
62pub type SideTypePrimitive = u8;
63#[allow(non_snake_case, non_upper_case_globals)]
64pub mod SideType {
65 use super::SideTypePrimitive;
66 pub const Left: SideTypePrimitive = 0;
67 pub const Right: SideTypePrimitive = 1;
68}
69
70pub enum AsOfInequalityType {
71 Le,
72 Lt,
73 Ge,
74 Gt,
75}
76
77pub struct AsOfDesc {
78 pub left_idx: usize,
79 pub right_idx: usize,
80 pub inequality_type: AsOfInequalityType,
81}
82
83impl AsOfDesc {
84 pub fn from_protobuf(desc_proto: &AsOfJoinDesc) -> StreamResult<Self> {
85 let typ = match desc_proto.inequality_type() {
86 AsOfJoinInequalityType::AsOfInequalityTypeLt => AsOfInequalityType::Lt,
87 AsOfJoinInequalityType::AsOfInequalityTypeLe => AsOfInequalityType::Le,
88 AsOfJoinInequalityType::AsOfInequalityTypeGt => AsOfInequalityType::Gt,
89 AsOfJoinInequalityType::AsOfInequalityTypeGe => AsOfInequalityType::Ge,
90 AsOfJoinInequalityType::AsOfInequalityTypeUnspecified => {
91 bail!("unspecified AsOf join inequality type")
92 }
93 };
94 Ok(Self {
95 left_idx: desc_proto.left_idx as usize,
96 right_idx: desc_proto.right_idx as usize,
97 inequality_type: typ,
98 })
99 }
100}
101
102pub const fn is_outer_side(join_type: JoinTypePrimitive, side_type: SideTypePrimitive) -> bool {
103 join_type == JoinType::FullOuter
104 || (join_type == JoinType::LeftOuter && side_type == SideType::Left)
105 || (join_type == JoinType::RightOuter && side_type == SideType::Right)
106}
107
108pub const fn outer_side_null(join_type: JoinTypePrimitive, side_type: SideTypePrimitive) -> bool {
109 join_type == JoinType::FullOuter
110 || (join_type == JoinType::LeftOuter && side_type == SideType::Right)
111 || (join_type == JoinType::RightOuter && side_type == SideType::Left)
112}
113
114pub const fn forward_exactly_once(
117 join_type: JoinTypePrimitive,
118 side_type: SideTypePrimitive,
119) -> bool {
120 ((join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti)
121 && side_type == SideType::Left)
122 || ((join_type == JoinType::RightSemi || join_type == JoinType::RightAnti)
123 && side_type == SideType::Right)
124}
125
126pub const fn only_forward_matched_side(
127 join_type: JoinTypePrimitive,
128 side_type: SideTypePrimitive,
129) -> bool {
130 ((join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti)
131 && side_type == SideType::Right)
132 || ((join_type == JoinType::RightSemi || join_type == JoinType::RightAnti)
133 && side_type == SideType::Left)
134}
135
136pub const fn is_semi(join_type: JoinTypePrimitive) -> bool {
137 join_type == JoinType::LeftSemi || join_type == JoinType::RightSemi
138}
139
140pub const fn is_anti(join_type: JoinTypePrimitive) -> bool {
141 join_type == JoinType::LeftAnti || join_type == JoinType::RightAnti
142}
143
144pub const fn is_left_semi_or_anti(join_type: JoinTypePrimitive) -> bool {
145 join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti
146}
147
148pub const fn is_right_semi_or_anti(join_type: JoinTypePrimitive) -> bool {
149 join_type == JoinType::RightSemi || join_type == JoinType::RightAnti
150}
151
152pub const fn need_left_degree(join_type: JoinTypePrimitive) -> bool {
153 join_type == JoinType::FullOuter
154 || join_type == JoinType::LeftOuter
155 || join_type == JoinType::LeftAnti
156 || join_type == JoinType::LeftSemi
157}
158
159pub const fn need_right_degree(join_type: JoinTypePrimitive) -> bool {
160 join_type == JoinType::FullOuter
161 || join_type == JoinType::RightOuter
162 || join_type == JoinType::RightAnti
163 || join_type == JoinType::RightSemi
164}
165
166pub const fn is_as_of_left_outer(join_type: AsOfJoinTypePrimitive) -> bool {
167 join_type == AsOfJoinType::LeftOuter
168}