1use std::collections::{BTreeMap, HashSet};
15use std::ops::Bound;
16use std::time::Duration;
17
18use either::Either;
19use itertools::Itertools;
20use multimap::MultiMap;
21use risingwave_common::array::Op;
22use risingwave_common::hash::{HashKey, NullBitmap};
23use risingwave_common::util::epoch::EpochPair;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use tokio::time::Instant;
26
27use self::builder::JoinChunkBuilder;
28use super::barrier_align::*;
29use super::join::hash_join::*;
30use super::join::*;
31use super::watermark::*;
32use crate::executor::join::builder::JoinStreamChunkBuilder;
33use crate::executor::prelude::*;
34
35const EVICT_EVERY_N_ROWS: u32 = 16;
37
38fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
39 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
40}
41
42pub struct JoinParams {
43 pub join_key_indices: Vec<usize>,
45 pub deduped_pk_indices: Vec<usize>,
47}
48
49impl JoinParams {
50 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
51 Self {
52 join_key_indices,
53 deduped_pk_indices,
54 }
55 }
56}
57
58struct JoinSide<K: HashKey, S: StateStore> {
59 ht: JoinHashMap<K, S>,
61 join_key_indices: Vec<usize>,
63 all_data_types: Vec<DataType>,
65 start_pos: usize,
67 i2o_mapping: Vec<(usize, usize)>,
69 i2o_mapping_indexed: MultiMap<usize, usize>,
70 need_degree_table: bool,
72}
73
74impl<K: HashKey, S: StateStore> std::fmt::Debug for JoinSide<K, S> {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("JoinSide")
77 .field("join_key_indices", &self.join_key_indices)
78 .field("col_types", &self.all_data_types)
79 .field("start_pos", &self.start_pos)
80 .field("i2o_mapping", &self.i2o_mapping)
81 .field("need_degree_table", &self.need_degree_table)
82 .finish()
83 }
84}
85
86impl<K: HashKey, S: StateStore> JoinSide<K, S> {
87 fn is_dirty(&self) -> bool {
89 unimplemented!()
90 }
91
92 #[expect(dead_code)]
93 fn clear_cache(&mut self) {
94 assert!(
95 !self.is_dirty(),
96 "cannot clear cache while states of hash join are dirty"
97 );
98
99 }
102
103 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
104 self.ht.init(epoch).await
105 }
106}
107
108pub struct AsOfJoinExecutor<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> {
111 ctx: ActorContextRef,
112 info: ExecutorInfo,
113
114 input_l: Option<Executor>,
116 input_r: Option<Executor>,
118 actual_output_data_types: Vec<DataType>,
120 side_l: JoinSide<K, S>,
122 side_r: JoinSide<K, S>,
124
125 metrics: Arc<StreamingMetrics>,
126 chunk_size: usize,
128 cnt_rows_received: u32,
130
131 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
133
134 high_join_amplification_threshold: usize,
135 asof_desc: AsOfDesc,
137}
138
139impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> std::fmt::Debug
140 for AsOfJoinExecutor<K, S, T>
141{
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("AsOfJoinExecutor")
144 .field("join_type", &T)
145 .field("input_left", &self.input_l.as_ref().unwrap().identity())
146 .field("input_right", &self.input_r.as_ref().unwrap().identity())
147 .field("side_l", &self.side_l)
148 .field("side_r", &self.side_r)
149 .field("pk_indices", &self.info.pk_indices)
150 .field("schema", &self.info.schema)
151 .field("actual_output_data_types", &self.actual_output_data_types)
152 .finish()
153 }
154}
155
156impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> Execute
157 for AsOfJoinExecutor<K, S, T>
158{
159 fn execute(self: Box<Self>) -> BoxedMessageStream {
160 self.into_stream().boxed()
161 }
162}
163
164struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
165 ctx: &'a ActorContextRef,
166 side_l: &'a mut JoinSide<K, S>,
167 side_r: &'a mut JoinSide<K, S>,
168 asof_desc: &'a AsOfDesc,
169 actual_output_data_types: &'a [DataType],
170 chunk: StreamChunk,
172 chunk_size: usize,
173 cnt_rows_received: &'a mut u32,
174 high_join_amplification_threshold: usize,
175}
176
177impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor<K, S, T> {
178 #[allow(clippy::too_many_arguments)]
179 pub fn new(
180 ctx: ActorContextRef,
181 info: ExecutorInfo,
182 input_l: Executor,
183 input_r: Executor,
184 params_l: JoinParams,
185 params_r: JoinParams,
186 null_safe: Vec<bool>,
187 output_indices: Vec<usize>,
188 state_table_l: StateTable<S>,
189 state_table_r: StateTable<S>,
190 watermark_epoch: AtomicU64Ref,
191 metrics: Arc<StreamingMetrics>,
192 chunk_size: usize,
193 high_join_amplification_threshold: usize,
194 asof_desc: AsOfDesc,
195 ) -> Self {
196 let side_l_column_n = input_l.schema().len();
197
198 let schema_fields = [
199 input_l.schema().fields.clone(),
200 input_r.schema().fields.clone(),
201 ]
202 .concat();
203
204 let original_output_data_types = schema_fields
205 .iter()
206 .map(|field| field.data_type())
207 .collect_vec();
208 let actual_output_data_types = output_indices
209 .iter()
210 .map(|&idx| original_output_data_types[idx].clone())
211 .collect_vec();
212
213 let state_all_data_types_l = input_l.schema().data_types();
215 let state_all_data_types_r = input_r.schema().data_types();
216
217 let state_pk_indices_l = input_l.pk_indices().to_vec();
218 let state_pk_indices_r = input_r.pk_indices().to_vec();
219
220 let state_join_key_indices_l = params_l.join_key_indices;
221 let state_join_key_indices_r = params_r.join_key_indices;
222
223 let pk_contained_in_jk_l =
225 is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
226 let pk_contained_in_jk_r =
227 is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
228
229 let join_key_data_types_l = state_join_key_indices_l
230 .iter()
231 .map(|idx| state_all_data_types_l[*idx].clone())
232 .collect_vec();
233
234 let join_key_data_types_r = state_join_key_indices_r
235 .iter()
236 .map(|idx| state_all_data_types_r[*idx].clone())
237 .collect_vec();
238
239 assert_eq!(join_key_data_types_l, join_key_data_types_r);
240
241 let null_matched = K::Bitmap::from_bool_vec(null_safe);
242
243 let (left_to_output, right_to_output) = {
244 let (left_len, right_len) = if is_left_semi_or_anti(T) {
245 (state_all_data_types_l.len(), 0usize)
246 } else if is_right_semi_or_anti(T) {
247 (0usize, state_all_data_types_r.len())
248 } else {
249 (state_all_data_types_l.len(), state_all_data_types_r.len())
250 };
251 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
252 };
253
254 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
255 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
256
257 let watermark_buffers = BTreeMap::new();
261
262 let inequal_key_idx_l = Some(asof_desc.left_idx);
263 let inequal_key_idx_r = Some(asof_desc.right_idx);
264
265 Self {
266 ctx: ctx.clone(),
267 info,
268 input_l: Some(input_l),
269 input_r: Some(input_r),
270 actual_output_data_types,
271 side_l: JoinSide {
272 ht: JoinHashMap::new(
273 watermark_epoch.clone(),
274 join_key_data_types_l,
275 state_join_key_indices_l.clone(),
276 state_all_data_types_l.clone(),
277 state_table_l,
278 params_l.deduped_pk_indices,
279 None,
280 null_matched.clone(),
281 pk_contained_in_jk_l,
282 inequal_key_idx_l,
283 metrics.clone(),
284 ctx.id,
285 ctx.fragment_id,
286 "left",
287 ),
288 join_key_indices: state_join_key_indices_l,
289 all_data_types: state_all_data_types_l,
290 i2o_mapping: left_to_output,
291 i2o_mapping_indexed: l2o_indexed,
292 start_pos: 0,
293 need_degree_table: false,
294 },
295 side_r: JoinSide {
296 ht: JoinHashMap::new(
297 watermark_epoch,
298 join_key_data_types_r,
299 state_join_key_indices_r.clone(),
300 state_all_data_types_r.clone(),
301 state_table_r,
302 params_r.deduped_pk_indices,
303 None,
304 null_matched,
305 pk_contained_in_jk_r,
306 inequal_key_idx_r,
307 metrics.clone(),
308 ctx.id,
309 ctx.fragment_id,
310 "right",
311 ),
312 join_key_indices: state_join_key_indices_r,
313 all_data_types: state_all_data_types_r,
314 start_pos: side_l_column_n,
315 i2o_mapping: right_to_output,
316 i2o_mapping_indexed: r2o_indexed,
317 need_degree_table: false,
318 },
319 metrics,
320 chunk_size,
321 cnt_rows_received: 0,
322 watermark_buffers,
323 high_join_amplification_threshold,
324 asof_desc,
325 }
326 }
327
328 #[try_stream(ok = Message, error = StreamExecutorError)]
329 async fn into_stream(mut self) {
330 let input_l = self.input_l.take().unwrap();
331 let input_r = self.input_r.take().unwrap();
332 let aligned_stream = barrier_align(
333 input_l.execute(),
334 input_r.execute(),
335 self.ctx.id,
336 self.ctx.fragment_id,
337 self.metrics.clone(),
338 "Join",
339 );
340 pin_mut!(aligned_stream);
341 let actor_id = self.ctx.id;
342
343 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
344 let first_epoch = barrier.epoch;
345 yield Message::Barrier(barrier);
347 self.side_l.init(first_epoch).await?;
348 self.side_r.init(first_epoch).await?;
349
350 let actor_id_str = self.ctx.id.to_string();
351 let fragment_id_str = self.ctx.fragment_id.to_string();
352
353 let join_actor_input_waiting_duration_ns = self
355 .metrics
356 .join_actor_input_waiting_duration_ns
357 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
358 let left_join_match_duration_ns = self
359 .metrics
360 .join_match_duration_ns
361 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
362 let right_join_match_duration_ns = self
363 .metrics
364 .join_match_duration_ns
365 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
366
367 let barrier_join_match_duration_ns = self
368 .metrics
369 .join_match_duration_ns
370 .with_guarded_label_values(&[
371 actor_id_str.as_str(),
372 fragment_id_str.as_str(),
373 "barrier",
374 ]);
375
376 let left_join_cached_entry_count = self
377 .metrics
378 .join_cached_entry_count
379 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
380
381 let right_join_cached_entry_count = self
382 .metrics
383 .join_cached_entry_count
384 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
385
386 let mut start_time = Instant::now();
387
388 while let Some(msg) = aligned_stream
389 .next()
390 .instrument_await("hash_join_barrier_align")
391 .await
392 {
393 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
394 match msg? {
395 AlignedMessage::WatermarkLeft(watermark) => {
396 for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
397 yield Message::Watermark(watermark_to_emit);
398 }
399 }
400 AlignedMessage::WatermarkRight(watermark) => {
401 for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
402 yield Message::Watermark(watermark_to_emit);
403 }
404 }
405 AlignedMessage::Left(chunk) => {
406 let mut left_time = Duration::from_nanos(0);
407 let mut left_start_time = Instant::now();
408 #[for_await]
409 for chunk in Self::eq_join_left(EqJoinArgs {
410 ctx: &self.ctx,
411 side_l: &mut self.side_l,
412 side_r: &mut self.side_r,
413 asof_desc: &self.asof_desc,
414 actual_output_data_types: &self.actual_output_data_types,
415 chunk,
417 chunk_size: self.chunk_size,
418 cnt_rows_received: &mut self.cnt_rows_received,
419 high_join_amplification_threshold: self.high_join_amplification_threshold,
420 }) {
421 left_time += left_start_time.elapsed();
422 yield Message::Chunk(chunk?);
423 left_start_time = Instant::now();
424 }
425 left_time += left_start_time.elapsed();
426 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
427 self.try_flush_data().await?;
428 }
429 AlignedMessage::Right(chunk) => {
430 let mut right_time = Duration::from_nanos(0);
431 let mut right_start_time = Instant::now();
432 #[for_await]
433 for chunk in Self::eq_join_right(EqJoinArgs {
434 ctx: &self.ctx,
435 side_l: &mut self.side_l,
436 side_r: &mut self.side_r,
437 asof_desc: &self.asof_desc,
438 actual_output_data_types: &self.actual_output_data_types,
439 chunk,
441 chunk_size: self.chunk_size,
442 cnt_rows_received: &mut self.cnt_rows_received,
443 high_join_amplification_threshold: self.high_join_amplification_threshold,
444 }) {
445 right_time += right_start_time.elapsed();
446 yield Message::Chunk(chunk?);
447 right_start_time = Instant::now();
448 }
449 right_time += right_start_time.elapsed();
450 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
451 self.try_flush_data().await?;
452 }
453 AlignedMessage::Barrier(barrier) => {
454 let barrier_start_time = Instant::now();
455 let (left_post_commit, right_post_commit) =
456 self.flush_data(barrier.epoch).await?;
457
458 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
459 yield Message::Barrier(barrier);
460
461 right_post_commit
463 .post_yield_barrier(update_vnode_bitmap.clone())
464 .await?;
465 if left_post_commit
466 .post_yield_barrier(update_vnode_bitmap)
467 .await?
468 .unwrap_or(false)
469 {
470 self.watermark_buffers
471 .values_mut()
472 .for_each(|buffers| buffers.clear());
473 }
474
475 for (join_cached_entry_count, ht) in [
477 (&left_join_cached_entry_count, &self.side_l.ht),
478 (&right_join_cached_entry_count, &self.side_r.ht),
479 ] {
480 join_cached_entry_count.set(ht.entry_count() as i64);
481 }
482
483 barrier_join_match_duration_ns
484 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
485 }
486 }
487 start_time = Instant::now();
488 }
489 }
490
491 async fn flush_data(
492 &mut self,
493 epoch: EpochPair,
494 ) -> StreamExecutorResult<(
495 JoinHashMapPostCommit<'_, K, S>,
496 JoinHashMapPostCommit<'_, K, S>,
497 )> {
498 let left = self.side_l.ht.flush(epoch).await?;
501 let right = self.side_r.ht.flush(epoch).await?;
502 Ok((left, right))
503 }
504
505 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
506 self.side_l.ht.try_flush().await?;
509 self.side_r.ht.try_flush().await?;
510 Ok(())
511 }
512
513 fn evict_cache(
515 side_update: &mut JoinSide<K, S>,
516 side_match: &mut JoinSide<K, S>,
517 cnt_rows_received: &mut u32,
518 ) {
519 *cnt_rows_received += 1;
520 if *cnt_rows_received == EVICT_EVERY_N_ROWS {
521 side_update.ht.evict();
522 side_match.ht.evict();
523 *cnt_rows_received = 0;
524 }
525 }
526
527 fn handle_watermark(
528 &mut self,
529 side: SideTypePrimitive,
530 watermark: Watermark,
531 ) -> StreamExecutorResult<Vec<Watermark>> {
532 let (side_update, side_match) = if side == SideType::Left {
533 (&mut self.side_l, &mut self.side_r)
534 } else {
535 (&mut self.side_r, &mut self.side_l)
536 };
537
538 if side_update.join_key_indices[0] == watermark.col_idx {
540 side_match.ht.update_watermark(watermark.val.clone());
541 }
542
543 let wm_in_jk = side_update
545 .join_key_indices
546 .iter()
547 .positions(|idx| *idx == watermark.col_idx);
548 let mut watermarks_to_emit = vec![];
549 for idx in wm_in_jk {
550 let buffers = self
551 .watermark_buffers
552 .entry(idx)
553 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
554 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
555 let empty_indices = vec![];
556 let output_indices = side_update
557 .i2o_mapping_indexed
558 .get_vec(&side_update.join_key_indices[idx])
559 .unwrap_or(&empty_indices)
560 .iter()
561 .chain(
562 side_match
563 .i2o_mapping_indexed
564 .get_vec(&side_match.join_key_indices[idx])
565 .unwrap_or(&empty_indices),
566 );
567 for output_idx in output_indices {
568 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
569 }
570 };
571 }
572 Ok(watermarks_to_emit)
573 }
574
575 async fn hash_eq_match(
578 key: &K,
579 ht: &mut JoinHashMap<K, S>,
580 ) -> StreamExecutorResult<Option<HashValueType>> {
581 if !key.null_bitmap().is_subset(ht.null_matched()) {
582 Ok(None)
583 } else {
584 ht.take_state(key).await.map(Some)
585 }
586 }
587
588 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
589 async fn eq_join_left(args: EqJoinArgs<'_, K, S>) {
590 let EqJoinArgs {
591 ctx: _,
592 side_l,
593 side_r,
594 asof_desc,
595 actual_output_data_types,
596 chunk,
598 chunk_size,
599 cnt_rows_received,
600 high_join_amplification_threshold: _,
601 } = args;
602
603 let (side_update, side_match) = (side_l, side_r);
604
605 let mut join_chunk_builder =
606 JoinChunkBuilder::<T, { SideType::Left }>::new(JoinStreamChunkBuilder::new(
607 chunk_size,
608 actual_output_data_types.to_vec(),
609 side_update.i2o_mapping.clone(),
610 side_match.i2o_mapping.clone(),
611 ));
612
613 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
614 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
615 let Some((op, row)) = r else {
616 continue;
617 };
618 Self::evict_cache(side_update, side_match, cnt_rows_received);
619
620 let matched_rows = if !side_update.ht.check_inequal_key_null(&row) {
621 Self::hash_eq_match(key, &mut side_match.ht).await?
622 } else {
623 None
624 };
625 let inequal_key = side_update.ht.serialize_inequal_key_from_row(row);
626
627 if let Some(matched_rows) = matched_rows {
628 let matched_row_by_inequality = match asof_desc.inequality_type {
629 AsOfInequalityType::Lt => matched_rows.lower_bound_by_inequality(
630 Bound::Excluded(&inequal_key),
631 &side_match.all_data_types,
632 ),
633 AsOfInequalityType::Le => matched_rows.lower_bound_by_inequality(
634 Bound::Included(&inequal_key),
635 &side_match.all_data_types,
636 ),
637 AsOfInequalityType::Gt => matched_rows.upper_bound_by_inequality(
638 Bound::Excluded(&inequal_key),
639 &side_match.all_data_types,
640 ),
641 AsOfInequalityType::Ge => matched_rows.upper_bound_by_inequality(
642 Bound::Included(&inequal_key),
643 &side_match.all_data_types,
644 ),
645 };
646 match op {
647 Op::Insert | Op::UpdateInsert => {
648 if let Some(matched_row_by_inequality) = matched_row_by_inequality {
649 let matched_row = matched_row_by_inequality?;
650
651 if let Some(chunk) =
652 join_chunk_builder.with_match_on_insert(&row, &matched_row)
653 {
654 yield chunk;
655 }
656 } else if let Some(chunk) =
657 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
658 {
659 yield chunk;
660 }
661 side_update.ht.insert_row(key, row)?;
662 }
663 Op::Delete | Op::UpdateDelete => {
664 if let Some(matched_row_by_inequality) = matched_row_by_inequality {
665 let matched_row = matched_row_by_inequality?;
666
667 if let Some(chunk) =
668 join_chunk_builder.with_match_on_delete(&row, &matched_row)
669 {
670 yield chunk;
671 }
672 } else if let Some(chunk) =
673 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
674 {
675 yield chunk;
676 }
677 side_update.ht.delete_row(key, row)?;
678 }
679 }
680 side_match.ht.update_state(key, matched_rows);
682 } else {
683 match op {
686 Op::Insert | Op::UpdateInsert => {
687 if let Some(chunk) =
688 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
689 {
690 yield chunk;
691 }
692 }
693 Op::Delete | Op::UpdateDelete => {
694 if let Some(chunk) =
695 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
696 {
697 yield chunk;
698 }
699 }
700 }
701 }
702 }
703 if let Some(chunk) = join_chunk_builder.take() {
704 yield chunk;
705 }
706 }
707
708 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
709 async fn eq_join_right(args: EqJoinArgs<'_, K, S>) {
710 let EqJoinArgs {
711 ctx,
712 side_l,
713 side_r,
714 asof_desc,
715 actual_output_data_types,
716 chunk,
718 chunk_size,
719 cnt_rows_received,
720 high_join_amplification_threshold,
721 } = args;
722
723 let (side_update, side_match) = (side_r, side_l);
724
725 let mut join_chunk_builder = JoinStreamChunkBuilder::new(
726 chunk_size,
727 actual_output_data_types.to_vec(),
728 side_update.i2o_mapping.clone(),
729 side_match.i2o_mapping.clone(),
730 );
731
732 let join_matched_rows_metrics = ctx
733 .streaming_metrics
734 .join_matched_join_keys
735 .with_guarded_label_values(&[
736 &ctx.id.to_string(),
737 &ctx.fragment_id.to_string(),
738 &side_update.ht.table_id().to_string(),
739 ]);
740
741 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
742 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
743 let Some((op, row)) = r else {
744 continue;
745 };
746 let mut join_matched_rows_cnt = 0;
747
748 Self::evict_cache(side_update, side_match, cnt_rows_received);
749
750 let matched_rows = if !side_update.ht.check_inequal_key_null(&row) {
751 Self::hash_eq_match(key, &mut side_match.ht).await?
752 } else {
753 None
754 };
755 let inequal_key = side_update.ht.serialize_inequal_key_from_row(row);
756
757 if let Some(matched_rows) = matched_rows {
758 let update_rows = Self::hash_eq_match(key, &mut side_update.ht).await?.expect("None is not expected because we have checked null in key when getting matched_rows");
759 let right_inequality_index = update_rows.inequality_index();
760 let (row_to_delete_r, row_to_insert_r) =
761 if let Some(pks) = right_inequality_index.get(&inequal_key) {
762 assert!(!pks.is_empty());
763 let row_pk = side_update.ht.serialize_pk_from_row(row);
764 match op {
765 Op::Insert | Op::UpdateInsert => {
766 let smallest_pk = pks.first_key_sorted().unwrap();
768 if smallest_pk > &row_pk {
769 if let Some(to_delete_row) = update_rows
771 .get_by_indexed_pk(smallest_pk, &side_update.all_data_types)
772 {
773 (
774 Some(Either::Left(to_delete_row?.row)),
775 Some(Either::Right(row)),
776 )
777 } else {
778 (None, None)
780 }
781 } else {
782 (None, None)
784 }
785 }
786 Op::Delete | Op::UpdateDelete => {
787 let smallest_pk = pks.first_key_sorted().unwrap();
788 if smallest_pk == &row_pk {
789 if let Some(second_smallest_pk) = pks.second_key_sorted() {
790 if let Some(to_insert_row) = update_rows.get_by_indexed_pk(
791 second_smallest_pk,
792 &side_update.all_data_types,
793 ) {
794 (
795 Some(Either::Right(row)),
796 Some(Either::Left(to_insert_row?.row)),
797 )
798 } else {
799 (None, None)
801 }
802 } else {
803 (Some(Either::Right(row)), None)
804 }
805 } else {
806 (None, None)
808 }
809 }
810 }
811 } else {
812 match op {
813 Op::Insert | Op::UpdateInsert => (None, Some(Either::Right(row))),
815 Op::Delete | Op::UpdateDelete => (Some(Either::Right(row)), None),
817 }
818 };
819
820 if row_to_delete_r.is_none() && row_to_insert_r.is_none() {
826 } else {
828 let prev_inequality_key =
829 right_inequality_index.upper_bound_key(Bound::Excluded(&inequal_key));
830 let next_inequality_key =
831 right_inequality_index.lower_bound_key(Bound::Excluded(&inequal_key));
832 let affected_row_r = match asof_desc.inequality_type {
833 AsOfInequalityType::Lt | AsOfInequalityType::Le => next_inequality_key
834 .and_then(|k| {
835 update_rows.get_first_by_inequality(k, &side_update.all_data_types)
836 }),
837 AsOfInequalityType::Gt | AsOfInequalityType::Ge => prev_inequality_key
838 .and_then(|k| {
839 update_rows.get_first_by_inequality(k, &side_update.all_data_types)
840 }),
841 }
842 .transpose()?
843 .map(|r| Either::Left(r.row));
844
845 let (row_to_delete_r, row_to_insert_r) =
846 match (&row_to_delete_r, &row_to_insert_r) {
847 (Some(_), Some(_)) => (row_to_delete_r, row_to_insert_r),
848 (None, Some(_)) => (affected_row_r, row_to_insert_r),
849 (Some(_), None) => (row_to_delete_r, affected_row_r),
850 (None, None) => unreachable!(),
851 };
852 let range = match asof_desc.inequality_type {
853 AsOfInequalityType::Lt => (
854 prev_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Included),
855 Bound::Excluded(&inequal_key),
856 ),
857 AsOfInequalityType::Le => (
858 prev_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Excluded),
859 Bound::Included(&inequal_key),
860 ),
861 AsOfInequalityType::Gt => (
862 Bound::Excluded(&inequal_key),
863 next_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Included),
864 ),
865 AsOfInequalityType::Ge => (
866 Bound::Included(&inequal_key),
867 next_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Excluded),
868 ),
869 };
870
871 let rows_l =
872 matched_rows.range_by_inequality(range, &side_match.all_data_types);
873 for row_l in rows_l {
874 join_matched_rows_cnt += 1;
875 let row_l = row_l?.row;
876 if let Some(row_to_delete_r) = &row_to_delete_r {
877 if let Some(chunk) =
878 join_chunk_builder.append_row(Op::Delete, row_to_delete_r, &row_l)
879 {
880 yield chunk;
881 }
882 } else if is_as_of_left_outer(T) {
883 if let Some(chunk) =
884 join_chunk_builder.append_row_matched(Op::Delete, &row_l)
885 {
886 yield chunk;
887 }
888 }
889 if let Some(row_to_insert_r) = &row_to_insert_r {
890 if let Some(chunk) =
891 join_chunk_builder.append_row(Op::Insert, row_to_insert_r, &row_l)
892 {
893 yield chunk;
894 }
895 } else if is_as_of_left_outer(T) {
896 if let Some(chunk) =
897 join_chunk_builder.append_row_matched(Op::Insert, &row_l)
898 {
899 yield chunk;
900 }
901 }
902 }
903 }
904 side_match.ht.update_state(key, matched_rows);
906 side_update.ht.update_state(key, update_rows);
907
908 match op {
909 Op::Insert | Op::UpdateInsert => {
910 side_update.ht.insert_row(key, row)?;
911 }
912 Op::Delete | Op::UpdateDelete => {
913 side_update.ht.delete_row(key, row)?;
914 }
915 }
916 } else {
917 }
921 join_matched_rows_metrics.observe(join_matched_rows_cnt as _);
922 if join_matched_rows_cnt > high_join_amplification_threshold {
923 let join_key_data_types = side_update.ht.join_key_data_types();
924 let key = key.deserialize(join_key_data_types)?;
925 tracing::warn!(target: "high_join_amplification",
926 matched_rows_len = join_matched_rows_cnt,
927 update_table_id = side_update.ht.table_id(),
928 match_table_id = side_match.ht.table_id(),
929 join_key = ?key,
930 actor_id = ctx.id,
931 fragment_id = ctx.fragment_id,
932 "large rows matched for join key when AsOf join updating right side",
933 );
934 }
935 }
936 if let Some(chunk) = join_chunk_builder.take() {
937 yield chunk;
938 }
939 }
940}
941
942#[cfg(test)]
943mod tests {
944 use std::sync::atomic::AtomicU64;
945
946 use risingwave_common::array::*;
947 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
948 use risingwave_common::hash::Key64;
949 use risingwave_common::util::epoch::test_epoch;
950 use risingwave_common::util::sort_util::OrderType;
951 use risingwave_storage::memory::MemoryStateStore;
952
953 use super::*;
954 use crate::common::table::test_utils::gen_pbtable;
955 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
956
957 async fn create_in_memory_state_table(
958 mem_state: MemoryStateStore,
959 data_types: &[DataType],
960 order_types: &[OrderType],
961 pk_indices: &[usize],
962 table_id: u32,
963 ) -> StateTable<MemoryStateStore> {
964 let column_descs = data_types
965 .iter()
966 .enumerate()
967 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
968 .collect_vec();
969 StateTable::from_table_catalog(
970 &gen_pbtable(
971 TableId::new(table_id),
972 column_descs,
973 order_types.to_vec(),
974 pk_indices.to_vec(),
975 0,
976 ),
977 mem_state.clone(),
978 None,
979 )
980 .await
981 }
982
983 async fn create_executor<const T: AsOfJoinTypePrimitive>(
984 asof_desc: AsOfDesc,
985 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
986 let schema = Schema {
987 fields: vec![
988 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
990 Field::unnamed(DataType::Int64),
991 ],
992 };
993 let (tx_l, source_l) = MockSource::channel();
994 let source_l = source_l.into_executor(schema.clone(), vec![1]);
995 let (tx_r, source_r) = MockSource::channel();
996 let source_r = source_r.into_executor(schema, vec![1]);
997 let params_l = JoinParams::new(vec![0], vec![1]);
998 let params_r = JoinParams::new(vec![0], vec![1]);
999
1000 let mem_state = MemoryStateStore::new();
1001
1002 let state_l = create_in_memory_state_table(
1003 mem_state.clone(),
1004 &[DataType::Int64, DataType::Int64, DataType::Int64],
1005 &[
1006 OrderType::ascending(),
1007 OrderType::ascending(),
1008 OrderType::ascending(),
1009 ],
1010 &[0, asof_desc.left_idx, 1],
1011 0,
1012 )
1013 .await;
1014
1015 let state_r = create_in_memory_state_table(
1016 mem_state,
1017 &[DataType::Int64, DataType::Int64, DataType::Int64],
1018 &[
1019 OrderType::ascending(),
1020 OrderType::ascending(),
1021 OrderType::ascending(),
1022 ],
1023 &[0, asof_desc.right_idx, 1],
1024 1,
1025 )
1026 .await;
1027
1028 let schema: Schema = [source_l.schema().fields(), source_r.schema().fields()]
1029 .concat()
1030 .into_iter()
1031 .collect();
1032 let schema_len = schema.len();
1033 let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1034
1035 let executor = AsOfJoinExecutor::<Key64, MemoryStateStore, T>::new(
1036 ActorContext::for_test(123),
1037 info,
1038 source_l,
1039 source_r,
1040 params_l,
1041 params_r,
1042 vec![false],
1043 (0..schema_len).collect_vec(),
1044 state_l,
1045 state_r,
1046 Arc::new(AtomicU64::new(0)),
1047 Arc::new(StreamingMetrics::unused()),
1048 1024,
1049 2048,
1050 asof_desc,
1051 );
1052 (tx_l, tx_r, executor.boxed().execute())
1053 }
1054
1055 #[tokio::test]
1056 async fn test_as_of_inner_join() -> StreamExecutorResult<()> {
1057 let asof_desc = AsOfDesc {
1058 left_idx: 0,
1059 right_idx: 2,
1060 inequality_type: AsOfInequalityType::Lt,
1061 };
1062
1063 let chunk_l1 = StreamChunk::from_pretty(
1064 " I I I
1065 + 1 4 7
1066 + 2 5 8
1067 + 3 6 9",
1068 );
1069 let chunk_l2 = StreamChunk::from_pretty(
1070 " I I I
1071 + 3 8 1
1072 - 3 8 1",
1073 );
1074 let chunk_r1 = StreamChunk::from_pretty(
1075 " I I I
1076 + 2 1 7
1077 + 2 2 1
1078 + 2 3 4
1079 + 2 4 2
1080 + 6 1 9
1081 + 6 2 9",
1082 );
1083 let chunk_r2 = StreamChunk::from_pretty(
1084 " I I I
1085 - 2 3 4",
1086 );
1087 let chunk_r3 = StreamChunk::from_pretty(
1088 " I I I
1089 + 2 3 3",
1090 );
1091 let chunk_l3 = StreamChunk::from_pretty(
1092 " I I I
1093 - 2 5 8",
1094 );
1095 let chunk_l4 = StreamChunk::from_pretty(
1096 " I I I
1097 + 6 3 1
1098 + 6 4 1",
1099 );
1100 let chunk_r4 = StreamChunk::from_pretty(
1101 " I I I
1102 - 6 1 9",
1103 );
1104
1105 let (mut tx_l, mut tx_r, mut hash_join) =
1106 create_executor::<{ AsOfJoinType::Inner }>(asof_desc).await;
1107
1108 tx_l.push_barrier(test_epoch(1), false);
1110 tx_r.push_barrier(test_epoch(1), false);
1111 hash_join.next_unwrap_ready_barrier()?;
1112
1113 tx_l.push_chunk(chunk_l1);
1115 hash_join.next_unwrap_pending();
1116
1117 tx_l.push_barrier(test_epoch(2), false);
1119 tx_r.push_barrier(test_epoch(2), false);
1120 hash_join.next_unwrap_ready_barrier()?;
1121
1122 tx_l.push_chunk(chunk_l2);
1124 hash_join.next_unwrap_pending();
1125
1126 tx_r.push_chunk(chunk_r1);
1128 let chunk = hash_join.next_unwrap_ready_chunk()?;
1129 assert_eq!(
1130 chunk,
1131 StreamChunk::from_pretty(
1132 " I I I I I I
1133 + 2 5 8 2 1 7
1134 - 2 5 8 2 1 7
1135 + 2 5 8 2 3 4"
1136 )
1137 );
1138
1139 tx_r.push_chunk(chunk_r2);
1141 let chunk = hash_join.next_unwrap_ready_chunk()?;
1142 assert_eq!(
1143 chunk,
1144 StreamChunk::from_pretty(
1145 " I I I I I I
1146 - 2 5 8 2 3 4
1147 + 2 5 8 2 1 7"
1148 )
1149 );
1150
1151 tx_r.push_chunk(chunk_r3);
1153 let chunk = hash_join.next_unwrap_ready_chunk()?;
1154 assert_eq!(
1155 chunk,
1156 StreamChunk::from_pretty(
1157 " I I I I I I
1158 - 2 5 8 2 1 7
1159 + 2 5 8 2 3 3"
1160 )
1161 );
1162
1163 tx_l.push_chunk(chunk_l3);
1165 let chunk = hash_join.next_unwrap_ready_chunk()?;
1166 assert_eq!(
1167 chunk,
1168 StreamChunk::from_pretty(
1169 " I I I I I I
1170 - 2 5 8 2 3 3"
1171 )
1172 );
1173
1174 tx_l.push_chunk(chunk_l4);
1176 let chunk = hash_join.next_unwrap_ready_chunk()?;
1177 assert_eq!(
1178 chunk,
1179 StreamChunk::from_pretty(
1180 " I I I I I I
1181 + 6 3 1 6 1 9
1182 + 6 4 1 6 1 9"
1183 )
1184 );
1185
1186 tx_r.push_chunk(chunk_r4);
1188 let chunk = hash_join.next_unwrap_ready_chunk()?;
1189 assert_eq!(
1190 chunk,
1191 StreamChunk::from_pretty(
1192 " I I I I I I
1193 - 6 3 1 6 1 9
1194 + 6 3 1 6 2 9
1195 - 6 4 1 6 1 9
1196 + 6 4 1 6 2 9"
1197 )
1198 );
1199
1200 Ok(())
1201 }
1202
1203 #[tokio::test]
1204 async fn test_as_of_left_outer_join() -> StreamExecutorResult<()> {
1205 let asof_desc = AsOfDesc {
1206 left_idx: 1,
1207 right_idx: 2,
1208 inequality_type: AsOfInequalityType::Ge,
1209 };
1210
1211 let chunk_l1 = StreamChunk::from_pretty(
1212 " I I I
1213 + 1 4 7
1214 + 2 5 8
1215 + 3 6 9",
1216 );
1217 let chunk_l2 = StreamChunk::from_pretty(
1218 " I I I
1219 + 3 8 1
1220 - 3 8 1",
1221 );
1222 let chunk_r1 = StreamChunk::from_pretty(
1223 " I I I
1224 + 2 3 4
1225 + 2 2 5
1226 + 2 1 5
1227 + 6 1 8
1228 + 6 2 9",
1229 );
1230 let chunk_r2 = StreamChunk::from_pretty(
1231 " I I I
1232 - 2 3 4
1233 - 2 1 5
1234 - 2 2 5",
1235 );
1236 let chunk_l3 = StreamChunk::from_pretty(
1237 " I I I
1238 + 6 8 9",
1239 );
1240 let chunk_r3 = StreamChunk::from_pretty(
1241 " I I I
1242 - 6 1 8",
1243 );
1244
1245 let (mut tx_l, mut tx_r, mut hash_join) =
1246 create_executor::<{ AsOfJoinType::LeftOuter }>(asof_desc).await;
1247
1248 tx_l.push_barrier(test_epoch(1), false);
1250 tx_r.push_barrier(test_epoch(1), false);
1251 hash_join.next_unwrap_ready_barrier()?;
1252
1253 tx_l.push_chunk(chunk_l1);
1255 let chunk = hash_join.next_unwrap_ready_chunk()?;
1256 assert_eq!(
1257 chunk,
1258 StreamChunk::from_pretty(
1259 " I I I I I I
1260 + 1 4 7 . . .
1261 + 2 5 8 . . .
1262 + 3 6 9 . . ."
1263 )
1264 );
1265
1266 tx_l.push_barrier(test_epoch(2), false);
1268 tx_r.push_barrier(test_epoch(2), false);
1269 hash_join.next_unwrap_ready_barrier()?;
1270
1271 tx_l.push_chunk(chunk_l2);
1273 let chunk = hash_join.next_unwrap_ready_chunk()?;
1274 assert_eq!(
1275 chunk,
1276 StreamChunk::from_pretty(
1277 " I I I I I I
1278 + 3 8 1 . . .
1279 - 3 8 1 . . ."
1280 )
1281 );
1282
1283 tx_r.push_chunk(chunk_r1);
1285 let chunk = hash_join.next_unwrap_ready_chunk()?;
1286 assert_eq!(
1287 chunk,
1288 StreamChunk::from_pretty(
1289 " I I I I I I
1290 - 2 5 8 . . .
1291 + 2 5 8 2 3 4
1292 - 2 5 8 2 3 4
1293 + 2 5 8 2 2 5
1294 - 2 5 8 2 2 5
1295 + 2 5 8 2 1 5"
1296 )
1297 );
1298
1299 tx_r.push_chunk(chunk_r2);
1301 let chunk = hash_join.next_unwrap_ready_chunk()?;
1302 assert_eq!(
1303 chunk,
1304 StreamChunk::from_pretty(
1305 " I I I I I I
1306 - 2 5 8 2 1 5
1307 + 2 5 8 2 2 5
1308 - 2 5 8 2 2 5
1309 + 2 5 8 . . ."
1310 )
1311 );
1312
1313 tx_l.push_chunk(chunk_l3);
1315 let chunk = hash_join.next_unwrap_ready_chunk()?;
1316 assert_eq!(
1317 chunk,
1318 StreamChunk::from_pretty(
1319 " I I I I I I
1320 + 6 8 9 6 1 8"
1321 )
1322 );
1323
1324 tx_r.push_chunk(chunk_r3);
1326 let chunk = hash_join.next_unwrap_ready_chunk()?;
1327 assert_eq!(
1328 chunk,
1329 StreamChunk::from_pretty(
1330 " I I I I I I
1331 - 6 8 9 6 1 8
1332 + 6 8 9 . . ."
1333 )
1334 );
1335 Ok(())
1336 }
1337}