risingwave_stream/executor/join/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_expr::bail;
16use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinInequalityType};
17
18use crate::error::StreamResult;
19
20pub mod builder;
21pub mod hash_join;
22pub mod join_row_set;
23pub mod row;
24
25pub(crate) type JoinOpPrimitive = bool;
26
27#[allow(non_snake_case, non_upper_case_globals)]
28pub(crate) mod JoinOp {
29    use super::JoinOpPrimitive;
30
31    pub const Insert: JoinOpPrimitive = true;
32    pub const Delete: JoinOpPrimitive = false;
33}
34
35/// The `JoinType` and `SideType` are to mimic a enum, because currently
36/// enum is not supported in const generic.
37// TODO: Use enum to replace this once [feature(adt_const_params)](https://github.com/rust-lang/rust/issues/95174) get completed.
38pub type JoinTypePrimitive = u8;
39
40#[allow(non_snake_case, non_upper_case_globals)]
41pub mod JoinType {
42    use super::JoinTypePrimitive;
43    pub const Inner: JoinTypePrimitive = 0;
44    pub const LeftOuter: JoinTypePrimitive = 1;
45    pub const RightOuter: JoinTypePrimitive = 2;
46    pub const FullOuter: JoinTypePrimitive = 3;
47    pub const LeftSemi: JoinTypePrimitive = 4;
48    pub const LeftAnti: JoinTypePrimitive = 5;
49    pub const RightSemi: JoinTypePrimitive = 6;
50    pub const RightAnti: JoinTypePrimitive = 7;
51}
52
53pub type AsOfJoinTypePrimitive = u8;
54
55#[allow(non_snake_case, non_upper_case_globals)]
56pub mod AsOfJoinType {
57    use super::AsOfJoinTypePrimitive;
58    pub const Inner: AsOfJoinTypePrimitive = 0;
59    pub const LeftOuter: AsOfJoinTypePrimitive = 1;
60}
61
62pub type SideTypePrimitive = u8;
63#[allow(non_snake_case, non_upper_case_globals)]
64pub mod SideType {
65    use super::SideTypePrimitive;
66    pub const Left: SideTypePrimitive = 0;
67    pub const Right: SideTypePrimitive = 1;
68}
69
70pub enum AsOfInequalityType {
71    Le,
72    Lt,
73    Ge,
74    Gt,
75}
76
77pub struct AsOfDesc {
78    pub left_idx: usize,
79    pub right_idx: usize,
80    pub inequality_type: AsOfInequalityType,
81}
82
83impl AsOfDesc {
84    pub fn from_protobuf(desc_proto: &AsOfJoinDesc) -> StreamResult<Self> {
85        let typ = match desc_proto.inequality_type() {
86            AsOfJoinInequalityType::AsOfInequalityTypeLt => AsOfInequalityType::Lt,
87            AsOfJoinInequalityType::AsOfInequalityTypeLe => AsOfInequalityType::Le,
88            AsOfJoinInequalityType::AsOfInequalityTypeGt => AsOfInequalityType::Gt,
89            AsOfJoinInequalityType::AsOfInequalityTypeGe => AsOfInequalityType::Ge,
90            AsOfJoinInequalityType::AsOfInequalityTypeUnspecified => {
91                bail!("unspecified AsOf join inequality type")
92            }
93        };
94        Ok(Self {
95            left_idx: desc_proto.left_idx as usize,
96            right_idx: desc_proto.right_idx as usize,
97            inequality_type: typ,
98        })
99    }
100}
101
102pub const fn is_outer_side(join_type: JoinTypePrimitive, side_type: SideTypePrimitive) -> bool {
103    join_type == JoinType::FullOuter
104        || (join_type == JoinType::LeftOuter && side_type == SideType::Left)
105        || (join_type == JoinType::RightOuter && side_type == SideType::Right)
106}
107
108pub const fn outer_side_null(join_type: JoinTypePrimitive, side_type: SideTypePrimitive) -> bool {
109    join_type == JoinType::FullOuter
110        || (join_type == JoinType::LeftOuter && side_type == SideType::Right)
111        || (join_type == JoinType::RightOuter && side_type == SideType::Left)
112}
113
114/// Send the update only once if the join type is semi/anti and the update is the same side as the
115/// join
116pub const fn forward_exactly_once(
117    join_type: JoinTypePrimitive,
118    side_type: SideTypePrimitive,
119) -> bool {
120    ((join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti)
121        && side_type == SideType::Left)
122        || ((join_type == JoinType::RightSemi || join_type == JoinType::RightAnti)
123            && side_type == SideType::Right)
124}
125
126pub const fn only_forward_matched_side(
127    join_type: JoinTypePrimitive,
128    side_type: SideTypePrimitive,
129) -> bool {
130    ((join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti)
131        && side_type == SideType::Right)
132        || ((join_type == JoinType::RightSemi || join_type == JoinType::RightAnti)
133            && side_type == SideType::Left)
134}
135
136pub const fn is_semi(join_type: JoinTypePrimitive) -> bool {
137    join_type == JoinType::LeftSemi || join_type == JoinType::RightSemi
138}
139
140pub const fn is_anti(join_type: JoinTypePrimitive) -> bool {
141    join_type == JoinType::LeftAnti || join_type == JoinType::RightAnti
142}
143
144pub const fn is_left_semi_or_anti(join_type: JoinTypePrimitive) -> bool {
145    join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti
146}
147
148pub const fn is_right_semi_or_anti(join_type: JoinTypePrimitive) -> bool {
149    join_type == JoinType::RightSemi || join_type == JoinType::RightAnti
150}
151
152pub const fn need_left_degree(join_type: JoinTypePrimitive) -> bool {
153    join_type == JoinType::FullOuter
154        || join_type == JoinType::LeftOuter
155        || join_type == JoinType::LeftAnti
156        || join_type == JoinType::LeftSemi
157}
158
159pub const fn need_right_degree(join_type: JoinTypePrimitive) -> bool {
160    join_type == JoinType::FullOuter
161        || join_type == JoinType::RightOuter
162        || join_type == JoinType::RightAnti
163        || join_type == JoinType::RightSemi
164}
165
166pub const fn is_as_of_left_outer(join_type: AsOfJoinTypePrimitive) -> bool {
167    join_type == AsOfJoinType::LeftOuter
168}