risingwave_stream/from_proto/
hash_join.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::config::streaming::JoinEncodingType;
18use risingwave_common::hash::{HashKey, HashKeyDispatcher};
19use risingwave_common::types::DataType;
20use risingwave_expr::expr::{NonStrictExpression, build_non_strict_from_prost};
21use risingwave_pb::plan_common::JoinType as JoinTypeProto;
22use risingwave_pb::stream_plan::{
23    HashJoinNode, HashJoinWatermarkHandleDesc, JoinKeyWatermarkIndex,
24};
25
26use super::*;
27use crate::common::table::state_table::{StateTable, StateTableBuilder};
28use crate::executor::hash_join::*;
29use crate::executor::monitor::StreamingMetrics;
30use crate::executor::{ActorContextRef, CpuEncoding, JoinType, MemoryEncoding};
31use crate::task::AtomicU64Ref;
32
33pub struct HashJoinExecutorBuilder;
34
35impl ExecutorBuilder for HashJoinExecutorBuilder {
36    type Node = HashJoinNode;
37
38    async fn new_boxed_executor(
39        params: ExecutorParams,
40        node: &Self::Node,
41        store: impl StateStore,
42    ) -> StreamResult<Executor> {
43        let is_append_only = node.is_append_only;
44        let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for hash join"));
45
46        let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
47
48        let table_l = node.get_left_table()?;
49        let degree_table_l = node.get_left_degree_table()?;
50
51        let table_r = node.get_right_table()?;
52        let degree_table_r = node.get_right_degree_table()?;
53
54        let params_l = JoinParams::new(
55            node.get_left_key()
56                .iter()
57                .map(|key| *key as usize)
58                .collect_vec(),
59            node.get_left_deduped_input_pk_indices()
60                .iter()
61                .map(|key| *key as usize)
62                .collect_vec(),
63        );
64        let params_r = JoinParams::new(
65            node.get_right_key()
66                .iter()
67                .map(|key| *key as usize)
68                .collect_vec(),
69            node.get_right_deduped_input_pk_indices()
70                .iter()
71                .map(|key| *key as usize)
72                .collect_vec(),
73        );
74        let null_safe = node.get_null_safe().clone();
75        let output_indices = node
76            .get_output_indices()
77            .iter()
78            .map(|&x| x as usize)
79            .collect_vec();
80
81        let condition = match node.get_condition() {
82            Ok(cond_prost) => Some(build_non_strict_from_prost(
83                cond_prost,
84                params.eval_error_report.clone(),
85            )?),
86            Err(_) => None,
87        };
88        trace!("Join non-equi condition: {:?}", condition);
89        let mut inequality_pairs = vec![];
90        if let Some(desc) = node.watermark_handle_desc.as_ref() {
91            inequality_pairs = desc
92                .inequality_pairs
93                .iter()
94                .map(|inequality_pair| InequalityPairInfo {
95                    left_idx: inequality_pair.left_idx as usize,
96                    right_idx: inequality_pair.right_idx as usize,
97                    clean_left_state: inequality_pair.clean_left_state,
98                    clean_right_state: inequality_pair.clean_right_state,
99                    op: inequality_pair.op(),
100                })
101                .collect();
102        }
103
104        let watermark_indices_in_jk = resolve_clean_watermark_indices_in_jk(
105            node.watermark_handle_desc.as_ref(),
106            params_l.join_key_indices.len(),
107        );
108
109        let join_key_data_types = params_l
110            .join_key_indices
111            .iter()
112            .map(|idx| source_l.schema().fields[*idx].data_type())
113            .collect_vec();
114
115        let state_table_l = StateTableBuilder::new(table_l, store.clone(), Some(vnodes.clone()))
116            .enable_preload_all_rows_by_config(&params.config)
117            .build()
118            .await;
119        let degree_state_table_l =
120            StateTableBuilder::new(degree_table_l, store.clone(), Some(vnodes.clone()))
121                .enable_preload_all_rows_by_config(&params.config)
122                .build()
123                .await;
124
125        let state_table_r = StateTableBuilder::new(table_r, store.clone(), Some(vnodes.clone()))
126            .enable_preload_all_rows_by_config(&params.config)
127            .build()
128            .await;
129        let degree_state_table_r = StateTableBuilder::new(degree_table_r, store, Some(vnodes))
130            .enable_preload_all_rows_by_config(&params.config)
131            .build()
132            .await;
133
134        // Previously, the `join_encoding_type` is persisted in the plan node.
135        // Now it's always `Unspecified` and we should refer to the job's config override.
136        #[allow(deprecated)]
137        let join_encoding_type = node
138            .get_join_encoding_type()
139            .map_or(params.config.developer.join_encoding_type, Into::into);
140
141        let args = HashJoinExecutorDispatcherArgs {
142            ctx: params.actor_context,
143            info: params.info.clone(),
144            source_l,
145            source_r,
146            params_l,
147            params_r,
148            null_safe,
149            output_indices,
150            cond: condition,
151            inequality_pairs,
152            state_table_l,
153            degree_state_table_l,
154            state_table_r,
155            degree_state_table_r,
156            lru_manager: params.watermark_epoch,
157            is_append_only,
158            metrics: params.executor_stats,
159            join_type_proto: node.get_join_type()?,
160            join_key_data_types,
161            chunk_size: params.config.developer.chunk_size,
162            high_join_amplification_threshold: (params.config.developer)
163                .high_join_amplification_threshold,
164            join_encoding_type,
165            watermark_indices_in_jk,
166        };
167
168        let exec = args.dispatch()?;
169        Ok((params.info, exec).into())
170    }
171}
172
173struct HashJoinExecutorDispatcherArgs<S: StateStore> {
174    ctx: ActorContextRef,
175    info: ExecutorInfo,
176    source_l: Executor,
177    source_r: Executor,
178    params_l: JoinParams,
179    params_r: JoinParams,
180    null_safe: Vec<bool>,
181    output_indices: Vec<usize>,
182    cond: Option<NonStrictExpression>,
183    inequality_pairs: Vec<InequalityPairInfo>,
184    state_table_l: StateTable<S>,
185    degree_state_table_l: StateTable<S>,
186    state_table_r: StateTable<S>,
187    degree_state_table_r: StateTable<S>,
188    lru_manager: AtomicU64Ref,
189    is_append_only: bool,
190    metrics: Arc<StreamingMetrics>,
191    join_type_proto: JoinTypeProto,
192    join_key_data_types: Vec<DataType>,
193    chunk_size: usize,
194    high_join_amplification_threshold: usize,
195    join_encoding_type: JoinEncodingType,
196    watermark_indices_in_jk: Vec<(usize, bool)>,
197}
198
199fn resolve_clean_watermark_indices_in_jk(
200    desc: Option<&HashJoinWatermarkHandleDesc>,
201    join_key_len: usize,
202) -> Vec<(usize, bool)> {
203    if let Some(desc) = desc {
204        return desc
205            .watermark_indices_in_jk
206            .iter()
207            .map(
208                |JoinKeyWatermarkIndex {
209                     index,
210                     do_state_cleaning,
211                 }| (*index as usize, *do_state_cleaning),
212            )
213            .collect_vec();
214    }
215    // For backward compatibility, if there are no `watermark_indices_in_jk`,
216    // we assume the first join key can be used for state cleaning.
217    if join_key_len > 0 {
218        vec![(0, true)]
219    } else {
220        vec![]
221    }
222}
223
224impl<S: StateStore> HashKeyDispatcher for HashJoinExecutorDispatcherArgs<S> {
225    type Output = StreamResult<Box<dyn Execute>>;
226
227    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
228        /// This macro helps to fill the const generic type parameter.
229        macro_rules! build {
230            ($join_type:ident, $join_encoding:ident) => {
231                Ok(
232                    HashJoinExecutor::<K, S, { JoinType::$join_type }, $join_encoding>::new(
233                        self.ctx,
234                        self.info,
235                        self.source_l,
236                        self.source_r,
237                        self.params_l,
238                        self.params_r,
239                        self.null_safe,
240                        self.output_indices,
241                        self.cond,
242                        self.inequality_pairs,
243                        self.state_table_l,
244                        self.degree_state_table_l,
245                        self.state_table_r,
246                        self.degree_state_table_r,
247                        self.lru_manager,
248                        self.is_append_only,
249                        self.metrics,
250                        self.chunk_size,
251                        self.high_join_amplification_threshold,
252                        self.watermark_indices_in_jk.clone(),
253                    )
254                    .boxed(),
255                )
256            };
257        }
258
259        macro_rules! build_match {
260            ($($join_type:ident),*) => {
261                match (self.join_type_proto, self.join_encoding_type) {
262                    (JoinTypeProto::AsofInner, _)
263                    | (JoinTypeProto::AsofLeftOuter, _)
264                    | (JoinTypeProto::Unspecified, _) => unreachable!(),
265                    $(
266                        (JoinTypeProto::$join_type, JoinEncodingType::Memory) => build!($join_type, MemoryEncoding),
267                        (JoinTypeProto::$join_type, JoinEncodingType::Cpu) => build!($join_type, CpuEncoding),
268                    )*
269                }
270            };
271        }
272        build_match! {
273            Inner,
274            LeftOuter,
275            RightOuter,
276            FullOuter,
277            LeftSemi,
278            LeftAnti,
279            RightSemi,
280            RightAnti
281        }
282    }
283
284    fn data_types(&self) -> &[DataType] {
285        &self.join_key_data_types
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use risingwave_pb::stream_plan::{HashJoinWatermarkHandleDesc, JoinKeyWatermarkIndex};
292
293    use super::resolve_clean_watermark_indices_in_jk;
294
295    #[test]
296    fn test_resolve_clean_watermark_indices_in_jk_from_desc() {
297        let desc = HashJoinWatermarkHandleDesc {
298            watermark_indices_in_jk: vec![
299                JoinKeyWatermarkIndex {
300                    index: 2,
301                    do_state_cleaning: true,
302                },
303                JoinKeyWatermarkIndex {
304                    index: 1,
305                    do_state_cleaning: false,
306                },
307            ],
308            inequality_pairs: vec![],
309        };
310        assert_eq!(
311            resolve_clean_watermark_indices_in_jk(Some(&desc), 3),
312            vec![(2, true), (1, false)]
313        );
314    }
315
316    #[test]
317    fn test_resolve_clean_watermark_indices_in_jk_default_first() {
318        assert_eq!(
319            resolve_clean_watermark_indices_in_jk(None, 2),
320            vec![(0, true)]
321        );
322    }
323
324    #[test]
325    fn test_resolve_clean_watermark_indices_in_jk_empty_on_no_keys() {
326        assert_eq!(resolve_clean_watermark_indices_in_jk(None, 0), vec![]);
327    }
328}