risingwave_stream/executor/join/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_expr::bail;
16use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinInequalityType};
17
18use crate::error::StreamResult;
19
20pub mod asof_join;
21pub mod builder;
22pub mod hash_join;
23pub mod join_row_set;
24pub mod row;
25
26pub(crate) type JoinOpPrimitive = bool;
27
28#[allow(non_snake_case, non_upper_case_globals)]
29pub(crate) mod JoinOp {
30    use super::JoinOpPrimitive;
31
32    pub const Insert: JoinOpPrimitive = true;
33    pub const Delete: JoinOpPrimitive = false;
34}
35
36/// The `JoinType` and `SideType` are to mimic a enum, because currently
37/// enum is not supported in const generic.
38// TODO: Use enum to replace this once [feature(adt_const_params)](https://github.com/rust-lang/rust/issues/95174) get completed.
39pub type JoinTypePrimitive = u8;
40
41#[allow(non_snake_case, non_upper_case_globals)]
42pub mod JoinType {
43    use super::JoinTypePrimitive;
44    pub const Inner: JoinTypePrimitive = 0;
45    pub const LeftOuter: JoinTypePrimitive = 1;
46    pub const RightOuter: JoinTypePrimitive = 2;
47    pub const FullOuter: JoinTypePrimitive = 3;
48    pub const LeftSemi: JoinTypePrimitive = 4;
49    pub const LeftAnti: JoinTypePrimitive = 5;
50    pub const RightSemi: JoinTypePrimitive = 6;
51    pub const RightAnti: JoinTypePrimitive = 7;
52}
53
54pub type AsOfJoinTypePrimitive = u8;
55
56#[allow(non_snake_case, non_upper_case_globals)]
57pub mod AsOfJoinType {
58    use super::AsOfJoinTypePrimitive;
59    pub const Inner: AsOfJoinTypePrimitive = 0;
60    pub const LeftOuter: AsOfJoinTypePrimitive = 1;
61}
62
63pub type SideTypePrimitive = u8;
64#[allow(non_snake_case, non_upper_case_globals)]
65pub mod SideType {
66    use super::SideTypePrimitive;
67    pub const Left: SideTypePrimitive = 0;
68    pub const Right: SideTypePrimitive = 1;
69}
70
71pub enum AsOfInequalityType {
72    Le,
73    Lt,
74    Ge,
75    Gt,
76}
77
78pub struct AsOfDesc {
79    pub left_idx: usize,
80    pub right_idx: usize,
81    pub inequality_type: AsOfInequalityType,
82}
83
84impl AsOfDesc {
85    pub fn from_protobuf(desc_proto: &AsOfJoinDesc) -> StreamResult<Self> {
86        let typ = match desc_proto.inequality_type() {
87            AsOfJoinInequalityType::AsOfInequalityTypeLt => AsOfInequalityType::Lt,
88            AsOfJoinInequalityType::AsOfInequalityTypeLe => AsOfInequalityType::Le,
89            AsOfJoinInequalityType::AsOfInequalityTypeGt => AsOfInequalityType::Gt,
90            AsOfJoinInequalityType::AsOfInequalityTypeGe => AsOfInequalityType::Ge,
91            AsOfJoinInequalityType::AsOfInequalityTypeUnspecified => {
92                bail!("unspecified AsOf join inequality type")
93            }
94        };
95        Ok(Self {
96            left_idx: desc_proto.left_idx as usize,
97            right_idx: desc_proto.right_idx as usize,
98            inequality_type: typ,
99        })
100    }
101}
102
103pub const fn is_outer_side(join_type: JoinTypePrimitive, side_type: SideTypePrimitive) -> bool {
104    join_type == JoinType::FullOuter
105        || (join_type == JoinType::LeftOuter && side_type == SideType::Left)
106        || (join_type == JoinType::RightOuter && side_type == SideType::Right)
107}
108
109pub const fn outer_side_null(join_type: JoinTypePrimitive, side_type: SideTypePrimitive) -> bool {
110    join_type == JoinType::FullOuter
111        || (join_type == JoinType::LeftOuter && side_type == SideType::Right)
112        || (join_type == JoinType::RightOuter && side_type == SideType::Left)
113}
114
115/// Send the update only once if the join type is semi/anti and the update is the same side as the
116/// join
117pub const fn forward_exactly_once(
118    join_type: JoinTypePrimitive,
119    side_type: SideTypePrimitive,
120) -> bool {
121    ((join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti)
122        && side_type == SideType::Left)
123        || ((join_type == JoinType::RightSemi || join_type == JoinType::RightAnti)
124            && side_type == SideType::Right)
125}
126
127pub const fn only_forward_matched_side(
128    join_type: JoinTypePrimitive,
129    side_type: SideTypePrimitive,
130) -> bool {
131    ((join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti)
132        && side_type == SideType::Right)
133        || ((join_type == JoinType::RightSemi || join_type == JoinType::RightAnti)
134            && side_type == SideType::Left)
135}
136
137pub const fn is_semi(join_type: JoinTypePrimitive) -> bool {
138    join_type == JoinType::LeftSemi || join_type == JoinType::RightSemi
139}
140
141pub const fn is_anti(join_type: JoinTypePrimitive) -> bool {
142    join_type == JoinType::LeftAnti || join_type == JoinType::RightAnti
143}
144
145pub const fn is_left_semi_or_anti(join_type: JoinTypePrimitive) -> bool {
146    join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti
147}
148
149pub const fn is_right_semi_or_anti(join_type: JoinTypePrimitive) -> bool {
150    join_type == JoinType::RightSemi || join_type == JoinType::RightAnti
151}
152
153pub const fn need_left_degree(join_type: JoinTypePrimitive) -> bool {
154    join_type == JoinType::FullOuter
155        || join_type == JoinType::LeftOuter
156        || join_type == JoinType::LeftAnti
157        || join_type == JoinType::LeftSemi
158}
159
160pub const fn need_right_degree(join_type: JoinTypePrimitive) -> bool {
161    join_type == JoinType::FullOuter
162        || join_type == JoinType::RightOuter
163        || join_type == JoinType::RightAnti
164        || join_type == JoinType::RightSemi
165}
166
167pub const fn is_as_of_left_outer(join_type: AsOfJoinTypePrimitive) -> bool {
168    join_type == AsOfJoinType::LeftOuter
169}