1use std::collections::{BTreeMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::time::Duration;
19
20use either::Either;
21use itertools::Itertools;
22use multimap::MultiMap;
23use risingwave_common::array::Op;
24use risingwave_common::hash::{HashKey, NullBitmap};
25use risingwave_common::util::epoch::EpochPair;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use tokio::time::Instant;
28
29use self::builder::JoinChunkBuilder;
30use super::barrier_align::*;
31use super::join::hash_join::*;
32use super::join::*;
33use super::watermark::*;
34use crate::executor::join::builder::JoinStreamChunkBuilder;
35use crate::executor::join::row::JoinEncoding;
36use crate::executor::prelude::*;
37
38fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
39 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
40}
41
42pub struct JoinParams {
43 pub join_key_indices: Vec<usize>,
45 pub deduped_pk_indices: Vec<usize>,
47}
48
49impl JoinParams {
50 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
51 Self {
52 join_key_indices,
53 deduped_pk_indices,
54 }
55 }
56}
57
58struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
59 ht: JoinHashMap<K, S, E>,
61 join_key_indices: Vec<usize>,
63 all_data_types: Vec<DataType>,
65 start_pos: usize,
67 i2o_mapping: Vec<(usize, usize)>,
69 i2o_mapping_indexed: MultiMap<usize, usize>,
70 need_degree_table: bool,
72 _marker: std::marker::PhantomData<E>,
73}
74
75impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("JoinSide")
78 .field("join_key_indices", &self.join_key_indices)
79 .field("col_types", &self.all_data_types)
80 .field("start_pos", &self.start_pos)
81 .field("i2o_mapping", &self.i2o_mapping)
82 .field("need_degree_table", &self.need_degree_table)
83 .finish()
84 }
85}
86
87impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
88 fn is_dirty(&self) -> bool {
90 unimplemented!()
91 }
92
93 #[expect(dead_code)]
94 fn clear_cache(&mut self) {
95 assert!(
96 !self.is_dirty(),
97 "cannot clear cache while states of hash join are dirty"
98 );
99
100 }
103
104 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
105 self.ht.init(epoch).await
106 }
107}
108
109pub struct AsOfJoinExecutor<
112 K: HashKey,
113 S: StateStore,
114 const T: AsOfJoinTypePrimitive,
115 E: JoinEncoding,
116> {
117 ctx: ActorContextRef,
118 info: ExecutorInfo,
119
120 input_l: Option<Executor>,
122 input_r: Option<Executor>,
124 actual_output_data_types: Vec<DataType>,
126 side_l: JoinSide<K, S, E>,
128 side_r: JoinSide<K, S, E>,
130
131 metrics: Arc<StreamingMetrics>,
132 chunk_size: usize,
134 cnt_rows_received: u32,
136
137 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
139
140 high_join_amplification_threshold: usize,
141 join_cache_evict_interval_rows: u32,
143 asof_desc: AsOfDesc,
145}
146
147impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
148 for AsOfJoinExecutor<K, S, T, E>
149{
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 f.debug_struct("AsOfJoinExecutor")
152 .field("join_type", &T)
153 .field("input_left", &self.input_l.as_ref().unwrap().identity())
154 .field("input_right", &self.input_r.as_ref().unwrap().identity())
155 .field("side_l", &self.side_l)
156 .field("side_r", &self.side_r)
157 .field("stream_key", &self.info.stream_key)
158 .field("schema", &self.info.schema)
159 .field("actual_output_data_types", &self.actual_output_data_types)
160 .field(
161 "join_cache_evict_interval_rows",
162 &self.join_cache_evict_interval_rows,
163 )
164 .finish()
165 }
166}
167
168impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive, E: JoinEncoding> Execute
169 for AsOfJoinExecutor<K, S, T, E>
170{
171 fn execute(self: Box<Self>) -> BoxedMessageStream {
172 self.into_stream().boxed()
173 }
174}
175
176struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
177 ctx: &'a ActorContextRef,
178 side_l: &'a mut JoinSide<K, S, E>,
179 side_r: &'a mut JoinSide<K, S, E>,
180 asof_desc: &'a AsOfDesc,
181 actual_output_data_types: &'a [DataType],
182 chunk: StreamChunk,
184 chunk_size: usize,
185 cnt_rows_received: &'a mut u32,
186 high_join_amplification_threshold: usize,
187 join_cache_evict_interval_rows: u32,
188}
189
190impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive, E: JoinEncoding>
191 AsOfJoinExecutor<K, S, T, E>
192{
193 #[allow(clippy::too_many_arguments)]
194 pub fn new(
195 ctx: ActorContextRef,
196 info: ExecutorInfo,
197 input_l: Executor,
198 input_r: Executor,
199 params_l: JoinParams,
200 params_r: JoinParams,
201 null_safe: Vec<bool>,
202 output_indices: Vec<usize>,
203 state_table_l: StateTable<S>,
204 state_table_r: StateTable<S>,
205 watermark_epoch: AtomicU64Ref,
206 metrics: Arc<StreamingMetrics>,
207 chunk_size: usize,
208 high_join_amplification_threshold: usize,
209 asof_desc: AsOfDesc,
210 ) -> Self {
211 let join_cache_evict_interval_rows = ctx
212 .config
213 .developer
214 .join_hash_map_evict_interval_rows
215 .max(1);
216 let side_l_column_n = input_l.schema().len();
217
218 let schema_fields = [
219 input_l.schema().fields.clone(),
220 input_r.schema().fields.clone(),
221 ]
222 .concat();
223
224 let original_output_data_types = schema_fields
225 .iter()
226 .map(|field| field.data_type())
227 .collect_vec();
228 let actual_output_data_types = output_indices
229 .iter()
230 .map(|&idx| original_output_data_types[idx].clone())
231 .collect_vec();
232
233 let state_all_data_types_l = input_l.schema().data_types();
235 let state_all_data_types_r = input_r.schema().data_types();
236
237 let state_pk_indices_l = input_l.stream_key().to_vec();
238 let state_pk_indices_r = input_r.stream_key().to_vec();
239
240 let state_join_key_indices_l = params_l.join_key_indices;
241 let state_join_key_indices_r = params_r.join_key_indices;
242
243 let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
245 let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
246
247 let join_key_data_types_l = state_join_key_indices_l
248 .iter()
249 .map(|idx| state_all_data_types_l[*idx].clone())
250 .collect_vec();
251
252 let join_key_data_types_r = state_join_key_indices_r
253 .iter()
254 .map(|idx| state_all_data_types_r[*idx].clone())
255 .collect_vec();
256
257 assert_eq!(join_key_data_types_l, join_key_data_types_r);
258
259 let null_matched = K::Bitmap::from_bool_vec(null_safe);
260
261 let (left_to_output, right_to_output) = {
262 let (left_len, right_len) = if is_left_semi_or_anti(T) {
263 (state_all_data_types_l.len(), 0usize)
264 } else if is_right_semi_or_anti(T) {
265 (0usize, state_all_data_types_r.len())
266 } else {
267 (state_all_data_types_l.len(), state_all_data_types_r.len())
268 };
269 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
270 };
271
272 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
273 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
274
275 let watermark_buffers = BTreeMap::new();
279
280 let inequal_key_idx_l = Some(asof_desc.left_idx);
281 let inequal_key_idx_r = Some(asof_desc.right_idx);
282
283 Self {
284 ctx: ctx.clone(),
285 info,
286 input_l: Some(input_l),
287 input_r: Some(input_r),
288 actual_output_data_types,
289 side_l: JoinSide {
290 ht: JoinHashMap::new(
291 watermark_epoch.clone(),
292 join_key_data_types_l,
293 state_join_key_indices_l.clone(),
294 state_all_data_types_l.clone(),
295 state_table_l,
296 params_l.deduped_pk_indices,
297 None,
298 null_matched.clone(),
299 pk_contained_in_jk_l,
300 inequal_key_idx_l,
301 metrics.clone(),
302 ctx.id,
303 ctx.fragment_id,
304 "left",
305 ),
306 join_key_indices: state_join_key_indices_l,
307 all_data_types: state_all_data_types_l,
308 i2o_mapping: left_to_output,
309 i2o_mapping_indexed: l2o_indexed,
310 start_pos: 0,
311 need_degree_table: false,
312 _marker: PhantomData,
313 },
314 side_r: JoinSide {
315 ht: JoinHashMap::new(
316 watermark_epoch,
317 join_key_data_types_r,
318 state_join_key_indices_r.clone(),
319 state_all_data_types_r.clone(),
320 state_table_r,
321 params_r.deduped_pk_indices,
322 None,
323 null_matched,
324 pk_contained_in_jk_r,
325 inequal_key_idx_r,
326 metrics.clone(),
327 ctx.id,
328 ctx.fragment_id,
329 "right",
330 ),
331 join_key_indices: state_join_key_indices_r,
332 all_data_types: state_all_data_types_r,
333 start_pos: side_l_column_n,
334 i2o_mapping: right_to_output,
335 i2o_mapping_indexed: r2o_indexed,
336 need_degree_table: false,
337 _marker: PhantomData,
338 },
339 metrics,
340 chunk_size,
341 cnt_rows_received: 0,
342 watermark_buffers,
343 high_join_amplification_threshold,
344 join_cache_evict_interval_rows,
345 asof_desc,
346 }
347 }
348
349 #[try_stream(ok = Message, error = StreamExecutorError)]
350 async fn into_stream(mut self) {
351 let input_l = self.input_l.take().unwrap();
352 let input_r = self.input_r.take().unwrap();
353 let aligned_stream = barrier_align(
354 input_l.execute(),
355 input_r.execute(),
356 self.ctx.id,
357 self.ctx.fragment_id,
358 self.metrics.clone(),
359 "Join",
360 );
361 pin_mut!(aligned_stream);
362 let actor_id = self.ctx.id;
363
364 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
365 let first_epoch = barrier.epoch;
366 yield Message::Barrier(barrier);
368 self.side_l.init(first_epoch).await?;
369 self.side_r.init(first_epoch).await?;
370
371 let actor_id_str = self.ctx.id.to_string();
372 let fragment_id_str = self.ctx.fragment_id.to_string();
373
374 let join_actor_input_waiting_duration_ns = self
376 .metrics
377 .join_actor_input_waiting_duration_ns
378 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
379 let left_join_match_duration_ns = self
380 .metrics
381 .join_match_duration_ns
382 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
383 let right_join_match_duration_ns = self
384 .metrics
385 .join_match_duration_ns
386 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
387
388 let barrier_join_match_duration_ns = self
389 .metrics
390 .join_match_duration_ns
391 .with_guarded_label_values(&[
392 actor_id_str.as_str(),
393 fragment_id_str.as_str(),
394 "barrier",
395 ]);
396
397 let left_join_cached_entry_count = self
398 .metrics
399 .join_cached_entry_count
400 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
401
402 let right_join_cached_entry_count = self
403 .metrics
404 .join_cached_entry_count
405 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
406
407 let mut start_time = Instant::now();
408
409 while let Some(msg) = aligned_stream
410 .next()
411 .instrument_await("hash_join_barrier_align")
412 .await
413 {
414 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
415 match msg? {
416 AlignedMessage::WatermarkLeft(watermark) => {
417 for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
418 yield Message::Watermark(watermark_to_emit);
419 }
420 }
421 AlignedMessage::WatermarkRight(watermark) => {
422 for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
423 yield Message::Watermark(watermark_to_emit);
424 }
425 }
426 AlignedMessage::Left(chunk) => {
427 let mut left_time = Duration::from_nanos(0);
428 let mut left_start_time = Instant::now();
429 #[for_await]
430 for chunk in Self::eq_join_left(EqJoinArgs {
431 ctx: &self.ctx,
432 side_l: &mut self.side_l,
433 side_r: &mut self.side_r,
434 asof_desc: &self.asof_desc,
435 actual_output_data_types: &self.actual_output_data_types,
436 chunk,
438 chunk_size: self.chunk_size,
439 cnt_rows_received: &mut self.cnt_rows_received,
440 high_join_amplification_threshold: self.high_join_amplification_threshold,
441 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
442 }) {
443 left_time += left_start_time.elapsed();
444 yield Message::Chunk(chunk?);
445 left_start_time = Instant::now();
446 }
447 left_time += left_start_time.elapsed();
448 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
449 self.try_flush_data().await?;
450 }
451 AlignedMessage::Right(chunk) => {
452 let mut right_time = Duration::from_nanos(0);
453 let mut right_start_time = Instant::now();
454 #[for_await]
455 for chunk in Self::eq_join_right(EqJoinArgs {
456 ctx: &self.ctx,
457 side_l: &mut self.side_l,
458 side_r: &mut self.side_r,
459 asof_desc: &self.asof_desc,
460 actual_output_data_types: &self.actual_output_data_types,
461 chunk,
463 chunk_size: self.chunk_size,
464 cnt_rows_received: &mut self.cnt_rows_received,
465 high_join_amplification_threshold: self.high_join_amplification_threshold,
466 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
467 }) {
468 right_time += right_start_time.elapsed();
469 yield Message::Chunk(chunk?);
470 right_start_time = Instant::now();
471 }
472 right_time += right_start_time.elapsed();
473 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
474 self.try_flush_data().await?;
475 }
476 AlignedMessage::Barrier(barrier) => {
477 let barrier_start_time = Instant::now();
478 let (left_post_commit, right_post_commit) =
479 self.flush_data(barrier.epoch).await?;
480
481 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
482 yield Message::Barrier(barrier);
483
484 right_post_commit
486 .post_yield_barrier(update_vnode_bitmap.clone())
487 .await?;
488 if left_post_commit
489 .post_yield_barrier(update_vnode_bitmap)
490 .await?
491 .unwrap_or(false)
492 {
493 self.watermark_buffers
494 .values_mut()
495 .for_each(|buffers| buffers.clear());
496 }
497
498 for (join_cached_entry_count, ht) in [
500 (&left_join_cached_entry_count, &self.side_l.ht),
501 (&right_join_cached_entry_count, &self.side_r.ht),
502 ] {
503 join_cached_entry_count.set(ht.entry_count() as i64);
504 }
505
506 barrier_join_match_duration_ns
507 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
508 }
509 }
510 start_time = Instant::now();
511 }
512 }
513
514 async fn flush_data(
515 &mut self,
516 epoch: EpochPair,
517 ) -> StreamExecutorResult<(
518 JoinHashMapPostCommit<'_, K, S, E>,
519 JoinHashMapPostCommit<'_, K, S, E>,
520 )> {
521 let left = self.side_l.ht.flush(epoch).await?;
524 let right = self.side_r.ht.flush(epoch).await?;
525 Ok((left, right))
526 }
527
528 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
529 self.side_l.ht.try_flush().await?;
532 self.side_r.ht.try_flush().await?;
533 Ok(())
534 }
535
536 fn evict_cache(
538 side_update: &mut JoinSide<K, S, E>,
539 side_match: &mut JoinSide<K, S, E>,
540 cnt_rows_received: &mut u32,
541 join_cache_evict_interval_rows: u32,
542 ) {
543 *cnt_rows_received += 1;
544 if *cnt_rows_received >= join_cache_evict_interval_rows {
545 side_update.ht.evict();
546 side_match.ht.evict();
547 *cnt_rows_received = 0;
548 }
549 }
550
551 fn handle_watermark(
552 &mut self,
553 side: SideTypePrimitive,
554 watermark: Watermark,
555 ) -> StreamExecutorResult<Vec<Watermark>> {
556 let (side_update, side_match) = if side == SideType::Left {
557 (&mut self.side_l, &mut self.side_r)
558 } else {
559 (&mut self.side_r, &mut self.side_l)
560 };
561
562 if side_update.join_key_indices[0] == watermark.col_idx {
564 side_match.ht.update_watermark(watermark.val.clone());
565 }
566
567 let wm_in_jk = side_update
569 .join_key_indices
570 .iter()
571 .positions(|idx| *idx == watermark.col_idx);
572 let mut watermarks_to_emit = vec![];
573 for idx in wm_in_jk {
574 let buffers = self
575 .watermark_buffers
576 .entry(idx)
577 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
578 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
579 let empty_indices = vec![];
580 let output_indices = side_update
581 .i2o_mapping_indexed
582 .get_vec(&side_update.join_key_indices[idx])
583 .unwrap_or(&empty_indices)
584 .iter()
585 .chain(
586 side_match
587 .i2o_mapping_indexed
588 .get_vec(&side_match.join_key_indices[idx])
589 .unwrap_or(&empty_indices),
590 );
591 for output_idx in output_indices {
592 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
593 }
594 };
595 }
596 Ok(watermarks_to_emit)
597 }
598
599 async fn hash_eq_match(
602 key: &K,
603 ht: &mut JoinHashMap<K, S, E>,
604 ) -> StreamExecutorResult<Option<HashValueType<E>>> {
605 if !key.null_bitmap().is_subset(ht.null_matched()) {
606 Ok(None)
607 } else {
608 ht.take_state(key).await.map(Some)
609 }
610 }
611
612 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
613 async fn eq_join_left(args: EqJoinArgs<'_, K, S, E>) {
614 let EqJoinArgs {
615 ctx: _,
616 side_l,
617 side_r,
618 asof_desc,
619 actual_output_data_types,
620 chunk,
622 chunk_size,
623 cnt_rows_received,
624 high_join_amplification_threshold: _,
625 join_cache_evict_interval_rows,
626 } = args;
627
628 let (side_update, side_match) = (side_l, side_r);
629
630 let mut join_chunk_builder =
631 JoinChunkBuilder::<T, { SideType::Left }>::new(JoinStreamChunkBuilder::new(
632 chunk_size,
633 actual_output_data_types.to_vec(),
634 side_update.i2o_mapping.clone(),
635 side_match.i2o_mapping.clone(),
636 ));
637
638 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
639 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
640 let Some((op, row)) = r else {
641 continue;
642 };
643 Self::evict_cache(
644 side_update,
645 side_match,
646 cnt_rows_received,
647 join_cache_evict_interval_rows,
648 );
649
650 let matched_rows = if !side_update.ht.check_inequal_key_null(&row) {
651 Self::hash_eq_match(key, &mut side_match.ht).await?
652 } else {
653 None
654 };
655 let inequal_key = side_update.ht.serialize_inequal_key_from_row(row);
656
657 if let Some(matched_rows) = matched_rows {
658 let matched_row_by_inequality = match asof_desc.inequality_type {
659 AsOfInequalityType::Lt => matched_rows.lower_bound_by_inequality(
660 Bound::Excluded(&inequal_key),
661 &side_match.all_data_types,
662 ),
663 AsOfInequalityType::Le => matched_rows.lower_bound_by_inequality(
664 Bound::Included(&inequal_key),
665 &side_match.all_data_types,
666 ),
667 AsOfInequalityType::Gt => matched_rows.upper_bound_by_inequality(
668 Bound::Excluded(&inequal_key),
669 &side_match.all_data_types,
670 ),
671 AsOfInequalityType::Ge => matched_rows.upper_bound_by_inequality(
672 Bound::Included(&inequal_key),
673 &side_match.all_data_types,
674 ),
675 };
676 match op {
677 Op::Insert | Op::UpdateInsert => {
678 if let Some(matched_row_by_inequality) = matched_row_by_inequality {
679 let matched_row = matched_row_by_inequality?;
680
681 if let Some(chunk) =
682 join_chunk_builder.with_match_on_insert(&row, &matched_row)
683 {
684 yield chunk;
685 }
686 } else if let Some(chunk) =
687 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
688 {
689 yield chunk;
690 }
691 side_update.ht.insert_row(key, row)?;
692 }
693 Op::Delete | Op::UpdateDelete => {
694 if let Some(matched_row_by_inequality) = matched_row_by_inequality {
695 let matched_row = matched_row_by_inequality?;
696
697 if let Some(chunk) =
698 join_chunk_builder.with_match_on_delete(&row, &matched_row)
699 {
700 yield chunk;
701 }
702 } else if let Some(chunk) =
703 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
704 {
705 yield chunk;
706 }
707 side_update.ht.delete_row(key, row)?;
708 }
709 }
710 side_match.ht.update_state(key, matched_rows);
712 } else {
713 match op {
716 Op::Insert | Op::UpdateInsert => {
717 if let Some(chunk) =
718 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
719 {
720 yield chunk;
721 }
722 }
723 Op::Delete | Op::UpdateDelete => {
724 if let Some(chunk) =
725 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
726 {
727 yield chunk;
728 }
729 }
730 }
731 }
732 }
733 if let Some(chunk) = join_chunk_builder.take() {
734 yield chunk;
735 }
736 }
737
738 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
739 async fn eq_join_right(args: EqJoinArgs<'_, K, S, E>) {
740 let EqJoinArgs {
741 ctx,
742 side_l,
743 side_r,
744 asof_desc,
745 actual_output_data_types,
746 chunk,
748 chunk_size,
749 cnt_rows_received,
750 high_join_amplification_threshold,
751 join_cache_evict_interval_rows,
752 } = args;
753
754 let (side_update, side_match) = (side_r, side_l);
755
756 let mut join_chunk_builder = JoinStreamChunkBuilder::new(
757 chunk_size,
758 actual_output_data_types.to_vec(),
759 side_update.i2o_mapping.clone(),
760 side_match.i2o_mapping.clone(),
761 );
762
763 let join_matched_rows_metrics = ctx
764 .streaming_metrics
765 .join_matched_join_keys
766 .with_guarded_label_values(&[
767 &ctx.id.to_string(),
768 &ctx.fragment_id.to_string(),
769 &side_update.ht.table_id().to_string(),
770 ]);
771
772 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
773 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
774 let Some((op, row)) = r else {
775 continue;
776 };
777 let mut join_matched_rows_cnt = 0;
778
779 Self::evict_cache(
780 side_update,
781 side_match,
782 cnt_rows_received,
783 join_cache_evict_interval_rows,
784 );
785
786 let matched_rows = if !side_update.ht.check_inequal_key_null(&row) {
787 Self::hash_eq_match(key, &mut side_match.ht).await?
788 } else {
789 None
790 };
791 let inequal_key = side_update.ht.serialize_inequal_key_from_row(row);
792
793 if let Some(matched_rows) = matched_rows {
794 let update_rows = Self::hash_eq_match(key, &mut side_update.ht).await?.expect("None is not expected because we have checked null in key when getting matched_rows");
795 let right_inequality_index = update_rows.inequality_index();
796 let (row_to_delete_r, row_to_insert_r) =
797 if let Some(pks) = right_inequality_index.get(&inequal_key) {
798 assert!(!pks.is_empty());
799 let row_pk = side_update.ht.serialize_pk_from_row(row);
800 match op {
801 Op::Insert | Op::UpdateInsert => {
802 let smallest_pk = pks.first_key_sorted().unwrap();
804 if smallest_pk > &row_pk {
805 if let Some(to_delete_row) = update_rows
807 .get_by_indexed_pk(smallest_pk, &side_update.all_data_types)
808 {
809 (
810 Some(Either::Left(to_delete_row?.row)),
811 Some(Either::Right(row)),
812 )
813 } else {
814 (None, None)
816 }
817 } else {
818 (None, None)
820 }
821 }
822 Op::Delete | Op::UpdateDelete => {
823 let smallest_pk = pks.first_key_sorted().unwrap();
824 if smallest_pk == &row_pk {
825 if let Some(second_smallest_pk) = pks.second_key_sorted() {
826 if let Some(to_insert_row) = update_rows.get_by_indexed_pk(
827 second_smallest_pk,
828 &side_update.all_data_types,
829 ) {
830 (
831 Some(Either::Right(row)),
832 Some(Either::Left(to_insert_row?.row)),
833 )
834 } else {
835 (None, None)
837 }
838 } else {
839 (Some(Either::Right(row)), None)
840 }
841 } else {
842 (None, None)
844 }
845 }
846 }
847 } else {
848 match op {
849 Op::Insert | Op::UpdateInsert => (None, Some(Either::Right(row))),
851 Op::Delete | Op::UpdateDelete => (Some(Either::Right(row)), None),
853 }
854 };
855
856 if row_to_delete_r.is_none() && row_to_insert_r.is_none() {
862 } else {
864 let prev_inequality_key =
865 right_inequality_index.upper_bound_key(Bound::Excluded(&inequal_key));
866 let next_inequality_key =
867 right_inequality_index.lower_bound_key(Bound::Excluded(&inequal_key));
868 let affected_row_r = match asof_desc.inequality_type {
869 AsOfInequalityType::Lt | AsOfInequalityType::Le => next_inequality_key
870 .and_then(|k| {
871 update_rows.get_first_by_inequality(k, &side_update.all_data_types)
872 }),
873 AsOfInequalityType::Gt | AsOfInequalityType::Ge => prev_inequality_key
874 .and_then(|k| {
875 update_rows.get_first_by_inequality(k, &side_update.all_data_types)
876 }),
877 }
878 .transpose()?
879 .map(|r| Either::Left(r.row));
880
881 let (row_to_delete_r, row_to_insert_r) =
882 match (&row_to_delete_r, &row_to_insert_r) {
883 (Some(_), Some(_)) => (row_to_delete_r, row_to_insert_r),
884 (None, Some(_)) => (affected_row_r, row_to_insert_r),
885 (Some(_), None) => (row_to_delete_r, affected_row_r),
886 (None, None) => unreachable!(),
887 };
888 let range = match asof_desc.inequality_type {
889 AsOfInequalityType::Lt => (
890 prev_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Included),
891 Bound::Excluded(&inequal_key),
892 ),
893 AsOfInequalityType::Le => (
894 prev_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Excluded),
895 Bound::Included(&inequal_key),
896 ),
897 AsOfInequalityType::Gt => (
898 Bound::Excluded(&inequal_key),
899 next_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Included),
900 ),
901 AsOfInequalityType::Ge => (
902 Bound::Included(&inequal_key),
903 next_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Excluded),
904 ),
905 };
906
907 let rows_l =
908 matched_rows.range_by_inequality(range, &side_match.all_data_types);
909 for row_l in rows_l {
910 join_matched_rows_cnt += 1;
911 let row_l = row_l?.row;
912 if let Some(row_to_delete_r) = &row_to_delete_r {
913 if let Some(chunk) =
914 join_chunk_builder.append_row(Op::Delete, row_to_delete_r, &row_l)
915 {
916 yield chunk;
917 }
918 } else if is_as_of_left_outer(T)
919 && let Some(chunk) =
920 join_chunk_builder.append_row_matched(Op::Delete, &row_l)
921 {
922 yield chunk;
923 }
924 if let Some(row_to_insert_r) = &row_to_insert_r {
925 if let Some(chunk) =
926 join_chunk_builder.append_row(Op::Insert, row_to_insert_r, &row_l)
927 {
928 yield chunk;
929 }
930 } else if is_as_of_left_outer(T)
931 && let Some(chunk) =
932 join_chunk_builder.append_row_matched(Op::Insert, &row_l)
933 {
934 yield chunk;
935 }
936 }
937 }
938 side_match.ht.update_state(key, matched_rows);
940 side_update.ht.update_state(key, update_rows);
941
942 match op {
943 Op::Insert | Op::UpdateInsert => {
944 side_update.ht.insert_row(key, row)?;
945 }
946 Op::Delete | Op::UpdateDelete => {
947 side_update.ht.delete_row(key, row)?;
948 }
949 }
950 } else {
951 }
955 join_matched_rows_metrics.observe(join_matched_rows_cnt as _);
956 if join_matched_rows_cnt > high_join_amplification_threshold {
957 let join_key_data_types = side_update.ht.join_key_data_types();
958 let key = key.deserialize(join_key_data_types)?;
959 tracing::warn!(target: "high_join_amplification",
960 matched_rows_len = join_matched_rows_cnt,
961 update_table_id = %side_update.ht.table_id(),
962 match_table_id = %side_match.ht.table_id(),
963 join_key = ?key,
964 actor_id = %ctx.id,
965 fragment_id = %ctx.fragment_id,
966 "large rows matched for join key when AsOf join updating right side",
967 );
968 }
969 }
970 if let Some(chunk) = join_chunk_builder.take() {
971 yield chunk;
972 }
973 }
974}
975
976#[cfg(test)]
977mod tests {
978 use std::sync::atomic::AtomicU64;
979
980 use risingwave_common::array::*;
981 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
982 use risingwave_common::hash::Key64;
983 use risingwave_common::util::epoch::test_epoch;
984 use risingwave_common::util::sort_util::OrderType;
985 use risingwave_storage::memory::MemoryStateStore;
986
987 use super::*;
988 use crate::common::table::test_utils::gen_pbtable;
989 use crate::executor::MemoryEncoding;
990 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
991
992 async fn create_in_memory_state_table(
993 mem_state: MemoryStateStore,
994 data_types: &[DataType],
995 order_types: &[OrderType],
996 pk_indices: &[usize],
997 table_id: u32,
998 ) -> StateTable<MemoryStateStore> {
999 let column_descs = data_types
1000 .iter()
1001 .enumerate()
1002 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1003 .collect_vec();
1004 StateTable::from_table_catalog(
1005 &gen_pbtable(
1006 TableId::new(table_id),
1007 column_descs,
1008 order_types.to_vec(),
1009 pk_indices.to_vec(),
1010 0,
1011 ),
1012 mem_state.clone(),
1013 None,
1014 )
1015 .await
1016 }
1017
1018 async fn create_executor<const T: AsOfJoinTypePrimitive>(
1019 asof_desc: AsOfDesc,
1020 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1021 let schema = Schema {
1022 fields: vec![
1023 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1025 Field::unnamed(DataType::Int64),
1026 ],
1027 };
1028 let (tx_l, source_l) = MockSource::channel();
1029 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1030 let (tx_r, source_r) = MockSource::channel();
1031 let source_r = source_r.into_executor(schema, vec![1]);
1032 let params_l = JoinParams::new(vec![0], vec![1]);
1033 let params_r = JoinParams::new(vec![0], vec![1]);
1034
1035 let mem_state = MemoryStateStore::new();
1036
1037 let state_l = create_in_memory_state_table(
1038 mem_state.clone(),
1039 &[DataType::Int64, DataType::Int64, DataType::Int64],
1040 &[
1041 OrderType::ascending(),
1042 OrderType::ascending(),
1043 OrderType::ascending(),
1044 ],
1045 &[0, asof_desc.left_idx, 1],
1046 0,
1047 )
1048 .await;
1049
1050 let state_r = create_in_memory_state_table(
1051 mem_state,
1052 &[DataType::Int64, DataType::Int64, DataType::Int64],
1053 &[
1054 OrderType::ascending(),
1055 OrderType::ascending(),
1056 OrderType::ascending(),
1057 ],
1058 &[0, asof_desc.right_idx, 1],
1059 1,
1060 )
1061 .await;
1062
1063 let schema: Schema = [source_l.schema().fields(), source_r.schema().fields()]
1064 .concat()
1065 .into_iter()
1066 .collect();
1067 let schema_len = schema.len();
1068 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1069
1070 let executor = AsOfJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1071 ActorContext::for_test(123),
1072 info,
1073 source_l,
1074 source_r,
1075 params_l,
1076 params_r,
1077 vec![false],
1078 (0..schema_len).collect_vec(),
1079 state_l,
1080 state_r,
1081 Arc::new(AtomicU64::new(0)),
1082 Arc::new(StreamingMetrics::unused()),
1083 1024,
1084 2048,
1085 asof_desc,
1086 );
1087 (tx_l, tx_r, executor.boxed().execute())
1088 }
1089
1090 #[tokio::test]
1091 async fn test_as_of_inner_join() -> StreamExecutorResult<()> {
1092 let asof_desc = AsOfDesc {
1093 left_idx: 0,
1094 right_idx: 2,
1095 inequality_type: AsOfInequalityType::Lt,
1096 };
1097
1098 let chunk_l1 = StreamChunk::from_pretty(
1099 " I I I
1100 + 1 4 7
1101 + 2 5 8
1102 + 3 6 9",
1103 );
1104 let chunk_l2 = StreamChunk::from_pretty(
1105 " I I I
1106 + 3 8 1
1107 - 3 8 1",
1108 );
1109 let chunk_r1 = StreamChunk::from_pretty(
1110 " I I I
1111 + 2 1 7
1112 + 2 2 1
1113 + 2 3 4
1114 + 2 4 2
1115 + 6 1 9
1116 + 6 2 9",
1117 );
1118 let chunk_r2 = StreamChunk::from_pretty(
1119 " I I I
1120 - 2 3 4",
1121 );
1122 let chunk_r3 = StreamChunk::from_pretty(
1123 " I I I
1124 + 2 3 3",
1125 );
1126 let chunk_l3 = StreamChunk::from_pretty(
1127 " I I I
1128 - 2 5 8",
1129 );
1130 let chunk_l4 = StreamChunk::from_pretty(
1131 " I I I
1132 + 6 3 1
1133 + 6 4 1",
1134 );
1135 let chunk_r4 = StreamChunk::from_pretty(
1136 " I I I
1137 - 6 1 9",
1138 );
1139
1140 let (mut tx_l, mut tx_r, mut hash_join) =
1141 create_executor::<{ AsOfJoinType::Inner }>(asof_desc).await;
1142
1143 tx_l.push_barrier(test_epoch(1), false);
1145 tx_r.push_barrier(test_epoch(1), false);
1146 hash_join.next_unwrap_ready_barrier()?;
1147
1148 tx_l.push_chunk(chunk_l1);
1150 hash_join.next_unwrap_pending();
1151
1152 tx_l.push_barrier(test_epoch(2), false);
1154 tx_r.push_barrier(test_epoch(2), false);
1155 hash_join.next_unwrap_ready_barrier()?;
1156
1157 tx_l.push_chunk(chunk_l2);
1159 hash_join.next_unwrap_pending();
1160
1161 tx_r.push_chunk(chunk_r1);
1163 let chunk = hash_join.next_unwrap_ready_chunk()?;
1164 assert_eq!(
1165 chunk,
1166 StreamChunk::from_pretty(
1167 " I I I I I I
1168 + 2 5 8 2 1 7
1169 - 2 5 8 2 1 7
1170 + 2 5 8 2 3 4"
1171 )
1172 );
1173
1174 tx_r.push_chunk(chunk_r2);
1176 let chunk = hash_join.next_unwrap_ready_chunk()?;
1177 assert_eq!(
1178 chunk,
1179 StreamChunk::from_pretty(
1180 " I I I I I I
1181 - 2 5 8 2 3 4
1182 + 2 5 8 2 1 7"
1183 )
1184 );
1185
1186 tx_r.push_chunk(chunk_r3);
1188 let chunk = hash_join.next_unwrap_ready_chunk()?;
1189 assert_eq!(
1190 chunk,
1191 StreamChunk::from_pretty(
1192 " I I I I I I
1193 - 2 5 8 2 1 7
1194 + 2 5 8 2 3 3"
1195 )
1196 );
1197
1198 tx_l.push_chunk(chunk_l3);
1200 let chunk = hash_join.next_unwrap_ready_chunk()?;
1201 assert_eq!(
1202 chunk,
1203 StreamChunk::from_pretty(
1204 " I I I I I I
1205 - 2 5 8 2 3 3"
1206 )
1207 );
1208
1209 tx_l.push_chunk(chunk_l4);
1211 let chunk = hash_join.next_unwrap_ready_chunk()?;
1212 assert_eq!(
1213 chunk,
1214 StreamChunk::from_pretty(
1215 " I I I I I I
1216 + 6 3 1 6 1 9
1217 + 6 4 1 6 1 9"
1218 )
1219 );
1220
1221 tx_r.push_chunk(chunk_r4);
1223 let chunk = hash_join.next_unwrap_ready_chunk()?;
1224 assert_eq!(
1225 chunk,
1226 StreamChunk::from_pretty(
1227 " I I I I I I
1228 - 6 3 1 6 1 9
1229 + 6 3 1 6 2 9
1230 - 6 4 1 6 1 9
1231 + 6 4 1 6 2 9"
1232 )
1233 );
1234
1235 Ok(())
1236 }
1237
1238 #[tokio::test]
1239 async fn test_as_of_left_outer_join() -> StreamExecutorResult<()> {
1240 let asof_desc = AsOfDesc {
1241 left_idx: 1,
1242 right_idx: 2,
1243 inequality_type: AsOfInequalityType::Ge,
1244 };
1245
1246 let chunk_l1 = StreamChunk::from_pretty(
1247 " I I I
1248 + 1 4 7
1249 + 2 5 8
1250 + 3 6 9",
1251 );
1252 let chunk_l2 = StreamChunk::from_pretty(
1253 " I I I
1254 + 3 8 1
1255 - 3 8 1",
1256 );
1257 let chunk_r1 = StreamChunk::from_pretty(
1258 " I I I
1259 + 2 3 4
1260 + 2 2 5
1261 + 2 1 5
1262 + 6 1 8
1263 + 6 2 9",
1264 );
1265 let chunk_r2 = StreamChunk::from_pretty(
1266 " I I I
1267 - 2 3 4
1268 - 2 1 5
1269 - 2 2 5",
1270 );
1271 let chunk_l3 = StreamChunk::from_pretty(
1272 " I I I
1273 + 6 8 9",
1274 );
1275 let chunk_r3 = StreamChunk::from_pretty(
1276 " I I I
1277 - 6 1 8",
1278 );
1279
1280 let (mut tx_l, mut tx_r, mut hash_join) =
1281 create_executor::<{ AsOfJoinType::LeftOuter }>(asof_desc).await;
1282
1283 tx_l.push_barrier(test_epoch(1), false);
1285 tx_r.push_barrier(test_epoch(1), false);
1286 hash_join.next_unwrap_ready_barrier()?;
1287
1288 tx_l.push_chunk(chunk_l1);
1290 let chunk = hash_join.next_unwrap_ready_chunk()?;
1291 assert_eq!(
1292 chunk,
1293 StreamChunk::from_pretty(
1294 " I I I I I I
1295 + 1 4 7 . . .
1296 + 2 5 8 . . .
1297 + 3 6 9 . . ."
1298 )
1299 );
1300
1301 tx_l.push_barrier(test_epoch(2), false);
1303 tx_r.push_barrier(test_epoch(2), false);
1304 hash_join.next_unwrap_ready_barrier()?;
1305
1306 tx_l.push_chunk(chunk_l2);
1308 let chunk = hash_join.next_unwrap_ready_chunk()?;
1309 assert_eq!(
1310 chunk,
1311 StreamChunk::from_pretty(
1312 " I I I I I I
1313 + 3 8 1 . . . D
1314 - 3 8 1 . . . D"
1315 )
1316 );
1317
1318 tx_r.push_chunk(chunk_r1);
1320 let chunk = hash_join.next_unwrap_ready_chunk()?;
1321 assert_eq!(
1322 chunk,
1323 StreamChunk::from_pretty(
1324 " I I I I I I
1325 - 2 5 8 . . .
1326 + 2 5 8 2 3 4
1327 - 2 5 8 2 3 4
1328 + 2 5 8 2 2 5
1329 - 2 5 8 2 2 5
1330 + 2 5 8 2 1 5"
1331 )
1332 );
1333
1334 tx_r.push_chunk(chunk_r2);
1336 let chunk = hash_join.next_unwrap_ready_chunk()?;
1337 assert_eq!(
1338 chunk,
1339 StreamChunk::from_pretty(
1340 " I I I I I I
1341 - 2 5 8 2 1 5
1342 + 2 5 8 2 2 5
1343 - 2 5 8 2 2 5
1344 + 2 5 8 . . ."
1345 )
1346 );
1347
1348 tx_l.push_chunk(chunk_l3);
1350 let chunk = hash_join.next_unwrap_ready_chunk()?;
1351 assert_eq!(
1352 chunk,
1353 StreamChunk::from_pretty(
1354 " I I I I I I
1355 + 6 8 9 6 1 8"
1356 )
1357 );
1358
1359 tx_r.push_chunk(chunk_r3);
1361 let chunk = hash_join.next_unwrap_ready_chunk()?;
1362 assert_eq!(
1363 chunk,
1364 StreamChunk::from_pretty(
1365 " I I I I I I
1366 - 6 8 9 6 1 8
1367 + 6 8 9 . . ."
1368 )
1369 );
1370 Ok(())
1371 }
1372}