1use std::collections::{BTreeMap, HashSet};
15use std::ops::Bound;
16use std::time::Duration;
17
18use either::Either;
19use itertools::Itertools;
20use multimap::MultiMap;
21use risingwave_common::array::Op;
22use risingwave_common::hash::{HashKey, NullBitmap};
23use risingwave_common::util::epoch::EpochPair;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use tokio::time::Instant;
26
27use self::builder::JoinChunkBuilder;
28use super::barrier_align::*;
29use super::join::hash_join::*;
30use super::join::*;
31use super::watermark::*;
32use crate::executor::join::builder::JoinStreamChunkBuilder;
33use crate::executor::prelude::*;
34
35const EVICT_EVERY_N_ROWS: u32 = 16;
37
38fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
39 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
40}
41
42pub struct JoinParams {
43 pub join_key_indices: Vec<usize>,
45 pub deduped_pk_indices: Vec<usize>,
47}
48
49impl JoinParams {
50 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
51 Self {
52 join_key_indices,
53 deduped_pk_indices,
54 }
55 }
56}
57
58struct JoinSide<K: HashKey, S: StateStore> {
59 ht: JoinHashMap<K, S>,
61 join_key_indices: Vec<usize>,
63 all_data_types: Vec<DataType>,
65 start_pos: usize,
67 i2o_mapping: Vec<(usize, usize)>,
69 i2o_mapping_indexed: MultiMap<usize, usize>,
70 need_degree_table: bool,
72}
73
74impl<K: HashKey, S: StateStore> std::fmt::Debug for JoinSide<K, S> {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("JoinSide")
77 .field("join_key_indices", &self.join_key_indices)
78 .field("col_types", &self.all_data_types)
79 .field("start_pos", &self.start_pos)
80 .field("i2o_mapping", &self.i2o_mapping)
81 .field("need_degree_table", &self.need_degree_table)
82 .finish()
83 }
84}
85
86impl<K: HashKey, S: StateStore> JoinSide<K, S> {
87 fn is_dirty(&self) -> bool {
89 unimplemented!()
90 }
91
92 #[expect(dead_code)]
93 fn clear_cache(&mut self) {
94 assert!(
95 !self.is_dirty(),
96 "cannot clear cache while states of hash join are dirty"
97 );
98
99 }
102
103 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
104 self.ht.init(epoch).await
105 }
106}
107
108pub struct AsOfJoinExecutor<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> {
111 ctx: ActorContextRef,
112 info: ExecutorInfo,
113
114 input_l: Option<Executor>,
116 input_r: Option<Executor>,
118 actual_output_data_types: Vec<DataType>,
120 side_l: JoinSide<K, S>,
122 side_r: JoinSide<K, S>,
124
125 metrics: Arc<StreamingMetrics>,
126 chunk_size: usize,
128 cnt_rows_received: u32,
130
131 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
133
134 high_join_amplification_threshold: usize,
135 asof_desc: AsOfDesc,
137}
138
139impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> std::fmt::Debug
140 for AsOfJoinExecutor<K, S, T>
141{
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("AsOfJoinExecutor")
144 .field("join_type", &T)
145 .field("input_left", &self.input_l.as_ref().unwrap().identity())
146 .field("input_right", &self.input_r.as_ref().unwrap().identity())
147 .field("side_l", &self.side_l)
148 .field("side_r", &self.side_r)
149 .field("pk_indices", &self.info.pk_indices)
150 .field("schema", &self.info.schema)
151 .field("actual_output_data_types", &self.actual_output_data_types)
152 .finish()
153 }
154}
155
156impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> Execute
157 for AsOfJoinExecutor<K, S, T>
158{
159 fn execute(self: Box<Self>) -> BoxedMessageStream {
160 self.into_stream().boxed()
161 }
162}
163
164struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
165 ctx: &'a ActorContextRef,
166 side_l: &'a mut JoinSide<K, S>,
167 side_r: &'a mut JoinSide<K, S>,
168 asof_desc: &'a AsOfDesc,
169 actual_output_data_types: &'a [DataType],
170 chunk: StreamChunk,
172 chunk_size: usize,
173 cnt_rows_received: &'a mut u32,
174 high_join_amplification_threshold: usize,
175}
176
177impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor<K, S, T> {
178 #[allow(clippy::too_many_arguments)]
179 pub fn new(
180 ctx: ActorContextRef,
181 info: ExecutorInfo,
182 input_l: Executor,
183 input_r: Executor,
184 params_l: JoinParams,
185 params_r: JoinParams,
186 null_safe: Vec<bool>,
187 output_indices: Vec<usize>,
188 state_table_l: StateTable<S>,
189 state_table_r: StateTable<S>,
190 watermark_epoch: AtomicU64Ref,
191 metrics: Arc<StreamingMetrics>,
192 chunk_size: usize,
193 high_join_amplification_threshold: usize,
194 asof_desc: AsOfDesc,
195 ) -> Self {
196 let side_l_column_n = input_l.schema().len();
197
198 let schema_fields = [
199 input_l.schema().fields.clone(),
200 input_r.schema().fields.clone(),
201 ]
202 .concat();
203
204 let original_output_data_types = schema_fields
205 .iter()
206 .map(|field| field.data_type())
207 .collect_vec();
208 let actual_output_data_types = output_indices
209 .iter()
210 .map(|&idx| original_output_data_types[idx].clone())
211 .collect_vec();
212
213 let state_all_data_types_l = input_l.schema().data_types();
215 let state_all_data_types_r = input_r.schema().data_types();
216
217 let state_pk_indices_l = input_l.pk_indices().to_vec();
218 let state_pk_indices_r = input_r.pk_indices().to_vec();
219
220 let state_join_key_indices_l = params_l.join_key_indices;
221 let state_join_key_indices_r = params_r.join_key_indices;
222
223 let pk_contained_in_jk_l =
225 is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
226 let pk_contained_in_jk_r =
227 is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
228
229 let join_key_data_types_l = state_join_key_indices_l
230 .iter()
231 .map(|idx| state_all_data_types_l[*idx].clone())
232 .collect_vec();
233
234 let join_key_data_types_r = state_join_key_indices_r
235 .iter()
236 .map(|idx| state_all_data_types_r[*idx].clone())
237 .collect_vec();
238
239 assert_eq!(join_key_data_types_l, join_key_data_types_r);
240
241 let null_matched = K::Bitmap::from_bool_vec(null_safe);
242
243 let (left_to_output, right_to_output) = {
244 let (left_len, right_len) = if is_left_semi_or_anti(T) {
245 (state_all_data_types_l.len(), 0usize)
246 } else if is_right_semi_or_anti(T) {
247 (0usize, state_all_data_types_r.len())
248 } else {
249 (state_all_data_types_l.len(), state_all_data_types_r.len())
250 };
251 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
252 };
253
254 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
255 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
256
257 let watermark_buffers = BTreeMap::new();
261
262 let inequal_key_idx_l = Some(asof_desc.left_idx);
263 let inequal_key_idx_r = Some(asof_desc.right_idx);
264
265 Self {
266 ctx: ctx.clone(),
267 info,
268 input_l: Some(input_l),
269 input_r: Some(input_r),
270 actual_output_data_types,
271 side_l: JoinSide {
272 ht: JoinHashMap::new(
273 watermark_epoch.clone(),
274 join_key_data_types_l,
275 state_join_key_indices_l.clone(),
276 state_all_data_types_l.clone(),
277 state_table_l,
278 params_l.deduped_pk_indices,
279 None,
280 null_matched.clone(),
281 pk_contained_in_jk_l,
282 inequal_key_idx_l,
283 metrics.clone(),
284 ctx.id,
285 ctx.fragment_id,
286 "left",
287 ),
288 join_key_indices: state_join_key_indices_l,
289 all_data_types: state_all_data_types_l,
290 i2o_mapping: left_to_output,
291 i2o_mapping_indexed: l2o_indexed,
292 start_pos: 0,
293 need_degree_table: false,
294 },
295 side_r: JoinSide {
296 ht: JoinHashMap::new(
297 watermark_epoch,
298 join_key_data_types_r,
299 state_join_key_indices_r.clone(),
300 state_all_data_types_r.clone(),
301 state_table_r,
302 params_r.deduped_pk_indices,
303 None,
304 null_matched,
305 pk_contained_in_jk_r,
306 inequal_key_idx_r,
307 metrics.clone(),
308 ctx.id,
309 ctx.fragment_id,
310 "right",
311 ),
312 join_key_indices: state_join_key_indices_r,
313 all_data_types: state_all_data_types_r,
314 start_pos: side_l_column_n,
315 i2o_mapping: right_to_output,
316 i2o_mapping_indexed: r2o_indexed,
317 need_degree_table: false,
318 },
319 metrics,
320 chunk_size,
321 cnt_rows_received: 0,
322 watermark_buffers,
323 high_join_amplification_threshold,
324 asof_desc,
325 }
326 }
327
328 #[try_stream(ok = Message, error = StreamExecutorError)]
329 async fn into_stream(mut self) {
330 let input_l = self.input_l.take().unwrap();
331 let input_r = self.input_r.take().unwrap();
332 let aligned_stream = barrier_align(
333 input_l.execute(),
334 input_r.execute(),
335 self.ctx.id,
336 self.ctx.fragment_id,
337 self.metrics.clone(),
338 "Join",
339 );
340 pin_mut!(aligned_stream);
341 let actor_id = self.ctx.id;
342
343 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
344 let first_epoch = barrier.epoch;
345 yield Message::Barrier(barrier);
347 self.side_l.init(first_epoch).await?;
348 self.side_r.init(first_epoch).await?;
349
350 let actor_id_str = self.ctx.id.to_string();
351 let fragment_id_str = self.ctx.fragment_id.to_string();
352
353 let join_actor_input_waiting_duration_ns = self
355 .metrics
356 .join_actor_input_waiting_duration_ns
357 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
358 let left_join_match_duration_ns = self
359 .metrics
360 .join_match_duration_ns
361 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "left"]);
362 let right_join_match_duration_ns = self
363 .metrics
364 .join_match_duration_ns
365 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "right"]);
366
367 let barrier_join_match_duration_ns = self
368 .metrics
369 .join_match_duration_ns
370 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "barrier"]);
371
372 let left_join_cached_entry_count = self
373 .metrics
374 .join_cached_entry_count
375 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "left"]);
376
377 let right_join_cached_entry_count = self
378 .metrics
379 .join_cached_entry_count
380 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "right"]);
381
382 let mut start_time = Instant::now();
383
384 while let Some(msg) = aligned_stream
385 .next()
386 .instrument_await("hash_join_barrier_align")
387 .await
388 {
389 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
390 match msg? {
391 AlignedMessage::WatermarkLeft(watermark) => {
392 for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
393 yield Message::Watermark(watermark_to_emit);
394 }
395 }
396 AlignedMessage::WatermarkRight(watermark) => {
397 for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
398 yield Message::Watermark(watermark_to_emit);
399 }
400 }
401 AlignedMessage::Left(chunk) => {
402 let mut left_time = Duration::from_nanos(0);
403 let mut left_start_time = Instant::now();
404 #[for_await]
405 for chunk in Self::eq_join_left(EqJoinArgs {
406 ctx: &self.ctx,
407 side_l: &mut self.side_l,
408 side_r: &mut self.side_r,
409 asof_desc: &self.asof_desc,
410 actual_output_data_types: &self.actual_output_data_types,
411 chunk,
413 chunk_size: self.chunk_size,
414 cnt_rows_received: &mut self.cnt_rows_received,
415 high_join_amplification_threshold: self.high_join_amplification_threshold,
416 }) {
417 left_time += left_start_time.elapsed();
418 yield Message::Chunk(chunk?);
419 left_start_time = Instant::now();
420 }
421 left_time += left_start_time.elapsed();
422 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
423 self.try_flush_data().await?;
424 }
425 AlignedMessage::Right(chunk) => {
426 let mut right_time = Duration::from_nanos(0);
427 let mut right_start_time = Instant::now();
428 #[for_await]
429 for chunk in Self::eq_join_right(EqJoinArgs {
430 ctx: &self.ctx,
431 side_l: &mut self.side_l,
432 side_r: &mut self.side_r,
433 asof_desc: &self.asof_desc,
434 actual_output_data_types: &self.actual_output_data_types,
435 chunk,
437 chunk_size: self.chunk_size,
438 cnt_rows_received: &mut self.cnt_rows_received,
439 high_join_amplification_threshold: self.high_join_amplification_threshold,
440 }) {
441 right_time += right_start_time.elapsed();
442 yield Message::Chunk(chunk?);
443 right_start_time = Instant::now();
444 }
445 right_time += right_start_time.elapsed();
446 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
447 self.try_flush_data().await?;
448 }
449 AlignedMessage::Barrier(barrier) => {
450 let barrier_start_time = Instant::now();
451 let (left_post_commit, right_post_commit) =
452 self.flush_data(barrier.epoch).await?;
453
454 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
455 yield Message::Barrier(barrier);
456
457 right_post_commit
459 .post_yield_barrier(update_vnode_bitmap.clone())
460 .await?;
461 if left_post_commit
462 .post_yield_barrier(update_vnode_bitmap)
463 .await?
464 .unwrap_or(false)
465 {
466 self.watermark_buffers
467 .values_mut()
468 .for_each(|buffers| buffers.clear());
469 }
470
471 for (join_cached_entry_count, ht) in [
473 (&left_join_cached_entry_count, &self.side_l.ht),
474 (&right_join_cached_entry_count, &self.side_r.ht),
475 ] {
476 join_cached_entry_count.set(ht.entry_count() as i64);
477 }
478
479 barrier_join_match_duration_ns
480 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
481 }
482 }
483 start_time = Instant::now();
484 }
485 }
486
487 async fn flush_data(
488 &mut self,
489 epoch: EpochPair,
490 ) -> StreamExecutorResult<(
491 JoinHashMapPostCommit<'_, K, S>,
492 JoinHashMapPostCommit<'_, K, S>,
493 )> {
494 let left = self.side_l.ht.flush(epoch).await?;
497 let right = self.side_r.ht.flush(epoch).await?;
498 Ok((left, right))
499 }
500
501 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
502 self.side_l.ht.try_flush().await?;
505 self.side_r.ht.try_flush().await?;
506 Ok(())
507 }
508
509 fn evict_cache(
511 side_update: &mut JoinSide<K, S>,
512 side_match: &mut JoinSide<K, S>,
513 cnt_rows_received: &mut u32,
514 ) {
515 *cnt_rows_received += 1;
516 if *cnt_rows_received == EVICT_EVERY_N_ROWS {
517 side_update.ht.evict();
518 side_match.ht.evict();
519 *cnt_rows_received = 0;
520 }
521 }
522
523 fn handle_watermark(
524 &mut self,
525 side: SideTypePrimitive,
526 watermark: Watermark,
527 ) -> StreamExecutorResult<Vec<Watermark>> {
528 let (side_update, side_match) = if side == SideType::Left {
529 (&mut self.side_l, &mut self.side_r)
530 } else {
531 (&mut self.side_r, &mut self.side_l)
532 };
533
534 if side_update.join_key_indices[0] == watermark.col_idx {
536 side_match.ht.update_watermark(watermark.val.clone());
537 }
538
539 let wm_in_jk = side_update
541 .join_key_indices
542 .iter()
543 .positions(|idx| *idx == watermark.col_idx);
544 let mut watermarks_to_emit = vec![];
545 for idx in wm_in_jk {
546 let buffers = self
547 .watermark_buffers
548 .entry(idx)
549 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
550 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
551 let empty_indices = vec![];
552 let output_indices = side_update
553 .i2o_mapping_indexed
554 .get_vec(&side_update.join_key_indices[idx])
555 .unwrap_or(&empty_indices)
556 .iter()
557 .chain(
558 side_match
559 .i2o_mapping_indexed
560 .get_vec(&side_match.join_key_indices[idx])
561 .unwrap_or(&empty_indices),
562 );
563 for output_idx in output_indices {
564 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
565 }
566 };
567 }
568 Ok(watermarks_to_emit)
569 }
570
571 async fn hash_eq_match(
574 key: &K,
575 ht: &mut JoinHashMap<K, S>,
576 ) -> StreamExecutorResult<Option<HashValueType>> {
577 if !key.null_bitmap().is_subset(ht.null_matched()) {
578 Ok(None)
579 } else {
580 ht.take_state(key).await.map(Some)
581 }
582 }
583
584 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
585 async fn eq_join_left(args: EqJoinArgs<'_, K, S>) {
586 let EqJoinArgs {
587 ctx: _,
588 side_l,
589 side_r,
590 asof_desc,
591 actual_output_data_types,
592 chunk,
594 chunk_size,
595 cnt_rows_received,
596 high_join_amplification_threshold: _,
597 } = args;
598
599 let (side_update, side_match) = (side_l, side_r);
600
601 let mut join_chunk_builder =
602 JoinChunkBuilder::<T, { SideType::Left }>::new(JoinStreamChunkBuilder::new(
603 chunk_size,
604 actual_output_data_types.to_vec(),
605 side_update.i2o_mapping.clone(),
606 side_match.i2o_mapping.clone(),
607 ));
608
609 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
610 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
611 let Some((op, row)) = r else {
612 continue;
613 };
614 Self::evict_cache(side_update, side_match, cnt_rows_received);
615
616 let matched_rows = if !side_update.ht.check_inequal_key_null(&row) {
617 Self::hash_eq_match(key, &mut side_match.ht).await?
618 } else {
619 None
620 };
621 let inequal_key = side_update.ht.serialize_inequal_key_from_row(row);
622
623 if let Some(matched_rows) = matched_rows {
624 let matched_row_by_inequality = match asof_desc.inequality_type {
625 AsOfInequalityType::Lt => matched_rows.lower_bound_by_inequality(
626 Bound::Excluded(&inequal_key),
627 &side_match.all_data_types,
628 ),
629 AsOfInequalityType::Le => matched_rows.lower_bound_by_inequality(
630 Bound::Included(&inequal_key),
631 &side_match.all_data_types,
632 ),
633 AsOfInequalityType::Gt => matched_rows.upper_bound_by_inequality(
634 Bound::Excluded(&inequal_key),
635 &side_match.all_data_types,
636 ),
637 AsOfInequalityType::Ge => matched_rows.upper_bound_by_inequality(
638 Bound::Included(&inequal_key),
639 &side_match.all_data_types,
640 ),
641 };
642 match op {
643 Op::Insert | Op::UpdateInsert => {
644 if let Some(matched_row_by_inequality) = matched_row_by_inequality {
645 let matched_row = matched_row_by_inequality?;
646
647 if let Some(chunk) =
648 join_chunk_builder.with_match_on_insert(&row, &matched_row)
649 {
650 yield chunk;
651 }
652 } else if let Some(chunk) =
653 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
654 {
655 yield chunk;
656 }
657 side_update.ht.insert_row(key, row)?;
658 }
659 Op::Delete | Op::UpdateDelete => {
660 if let Some(matched_row_by_inequality) = matched_row_by_inequality {
661 let matched_row = matched_row_by_inequality?;
662
663 if let Some(chunk) =
664 join_chunk_builder.with_match_on_delete(&row, &matched_row)
665 {
666 yield chunk;
667 }
668 } else if let Some(chunk) =
669 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
670 {
671 yield chunk;
672 }
673 side_update.ht.delete_row(key, row)?;
674 }
675 }
676 side_match.ht.update_state(key, matched_rows);
678 } else {
679 match op {
682 Op::Insert | Op::UpdateInsert => {
683 if let Some(chunk) =
684 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
685 {
686 yield chunk;
687 }
688 }
689 Op::Delete | Op::UpdateDelete => {
690 if let Some(chunk) =
691 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
692 {
693 yield chunk;
694 }
695 }
696 }
697 }
698 }
699 if let Some(chunk) = join_chunk_builder.take() {
700 yield chunk;
701 }
702 }
703
704 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
705 async fn eq_join_right(args: EqJoinArgs<'_, K, S>) {
706 let EqJoinArgs {
707 ctx,
708 side_l,
709 side_r,
710 asof_desc,
711 actual_output_data_types,
712 chunk,
714 chunk_size,
715 cnt_rows_received,
716 high_join_amplification_threshold,
717 } = args;
718
719 let (side_update, side_match) = (side_r, side_l);
720
721 let mut join_chunk_builder = JoinStreamChunkBuilder::new(
722 chunk_size,
723 actual_output_data_types.to_vec(),
724 side_update.i2o_mapping.clone(),
725 side_match.i2o_mapping.clone(),
726 );
727
728 let join_matched_rows_metrics = ctx
729 .streaming_metrics
730 .join_matched_join_keys
731 .with_guarded_label_values(&[
732 &ctx.id.to_string(),
733 &ctx.fragment_id.to_string(),
734 &side_update.ht.table_id().to_string(),
735 ]);
736
737 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
738 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
739 let Some((op, row)) = r else {
740 continue;
741 };
742 let mut join_matched_rows_cnt = 0;
743
744 Self::evict_cache(side_update, side_match, cnt_rows_received);
745
746 let matched_rows = if !side_update.ht.check_inequal_key_null(&row) {
747 Self::hash_eq_match(key, &mut side_match.ht).await?
748 } else {
749 None
750 };
751 let inequal_key = side_update.ht.serialize_inequal_key_from_row(row);
752
753 if let Some(matched_rows) = matched_rows {
754 let update_rows = Self::hash_eq_match(key, &mut side_update.ht).await?.expect("None is not expected because we have checked null in key when getting matched_rows");
755 let right_inequality_index = update_rows.inequality_index();
756 let (row_to_delete_r, row_to_insert_r) =
757 if let Some(pks) = right_inequality_index.get(&inequal_key) {
758 assert!(!pks.is_empty());
759 let row_pk = side_update.ht.serialize_pk_from_row(row);
760 match op {
761 Op::Insert | Op::UpdateInsert => {
762 let smallest_pk = pks.first_key_sorted().unwrap();
764 if smallest_pk > &row_pk {
765 if let Some(to_delete_row) = update_rows
767 .get_by_indexed_pk(smallest_pk, &side_update.all_data_types)
768 {
769 (
770 Some(Either::Left(to_delete_row?.row)),
771 Some(Either::Right(row)),
772 )
773 } else {
774 (None, None)
776 }
777 } else {
778 (None, None)
780 }
781 }
782 Op::Delete | Op::UpdateDelete => {
783 let smallest_pk = pks.first_key_sorted().unwrap();
784 if smallest_pk == &row_pk {
785 if let Some(second_smallest_pk) = pks.second_key_sorted() {
786 if let Some(to_insert_row) = update_rows.get_by_indexed_pk(
787 second_smallest_pk,
788 &side_update.all_data_types,
789 ) {
790 (
791 Some(Either::Right(row)),
792 Some(Either::Left(to_insert_row?.row)),
793 )
794 } else {
795 (None, None)
797 }
798 } else {
799 (Some(Either::Right(row)), None)
800 }
801 } else {
802 (None, None)
804 }
805 }
806 }
807 } else {
808 match op {
809 Op::Insert | Op::UpdateInsert => (None, Some(Either::Right(row))),
811 Op::Delete | Op::UpdateDelete => (Some(Either::Right(row)), None),
813 }
814 };
815
816 if row_to_delete_r.is_none() && row_to_insert_r.is_none() {
822 } else {
824 let prev_inequality_key =
825 right_inequality_index.upper_bound_key(Bound::Excluded(&inequal_key));
826 let next_inequality_key =
827 right_inequality_index.lower_bound_key(Bound::Excluded(&inequal_key));
828 let affected_row_r = match asof_desc.inequality_type {
829 AsOfInequalityType::Lt | AsOfInequalityType::Le => next_inequality_key
830 .and_then(|k| {
831 update_rows.get_first_by_inequality(k, &side_update.all_data_types)
832 }),
833 AsOfInequalityType::Gt | AsOfInequalityType::Ge => prev_inequality_key
834 .and_then(|k| {
835 update_rows.get_first_by_inequality(k, &side_update.all_data_types)
836 }),
837 }
838 .transpose()?
839 .map(|r| Either::Left(r.row));
840
841 let (row_to_delete_r, row_to_insert_r) =
842 match (&row_to_delete_r, &row_to_insert_r) {
843 (Some(_), Some(_)) => (row_to_delete_r, row_to_insert_r),
844 (None, Some(_)) => (affected_row_r, row_to_insert_r),
845 (Some(_), None) => (row_to_delete_r, affected_row_r),
846 (None, None) => unreachable!(),
847 };
848 let range = match asof_desc.inequality_type {
849 AsOfInequalityType::Lt => (
850 prev_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Included),
851 Bound::Excluded(&inequal_key),
852 ),
853 AsOfInequalityType::Le => (
854 prev_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Excluded),
855 Bound::Included(&inequal_key),
856 ),
857 AsOfInequalityType::Gt => (
858 Bound::Excluded(&inequal_key),
859 next_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Included),
860 ),
861 AsOfInequalityType::Ge => (
862 Bound::Included(&inequal_key),
863 next_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Excluded),
864 ),
865 };
866
867 let rows_l =
868 matched_rows.range_by_inequality(range, &side_match.all_data_types);
869 for row_l in rows_l {
870 join_matched_rows_cnt += 1;
871 let row_l = row_l?.row;
872 if let Some(row_to_delete_r) = &row_to_delete_r {
873 if let Some(chunk) =
874 join_chunk_builder.append_row(Op::Delete, row_to_delete_r, &row_l)
875 {
876 yield chunk;
877 }
878 } else if is_as_of_left_outer(T) {
879 if let Some(chunk) =
880 join_chunk_builder.append_row_matched(Op::Delete, &row_l)
881 {
882 yield chunk;
883 }
884 }
885 if let Some(row_to_insert_r) = &row_to_insert_r {
886 if let Some(chunk) =
887 join_chunk_builder.append_row(Op::Insert, row_to_insert_r, &row_l)
888 {
889 yield chunk;
890 }
891 } else if is_as_of_left_outer(T) {
892 if let Some(chunk) =
893 join_chunk_builder.append_row_matched(Op::Insert, &row_l)
894 {
895 yield chunk;
896 }
897 }
898 }
899 }
900 side_match.ht.update_state(key, matched_rows);
902 side_update.ht.update_state(key, update_rows);
903
904 match op {
905 Op::Insert | Op::UpdateInsert => {
906 side_update.ht.insert_row(key, row)?;
907 }
908 Op::Delete | Op::UpdateDelete => {
909 side_update.ht.delete_row(key, row)?;
910 }
911 }
912 } else {
913 }
917 join_matched_rows_metrics.observe(join_matched_rows_cnt as _);
918 if join_matched_rows_cnt > high_join_amplification_threshold {
919 let join_key_data_types = side_update.ht.join_key_data_types();
920 let key = key.deserialize(join_key_data_types)?;
921 tracing::warn!(target: "high_join_amplification",
922 matched_rows_len = join_matched_rows_cnt,
923 update_table_id = side_update.ht.table_id(),
924 match_table_id = side_match.ht.table_id(),
925 join_key = ?key,
926 actor_id = ctx.id,
927 fragment_id = ctx.fragment_id,
928 "large rows matched for join key when AsOf join updating right side",
929 );
930 }
931 }
932 if let Some(chunk) = join_chunk_builder.take() {
933 yield chunk;
934 }
935 }
936}
937
938#[cfg(test)]
939mod tests {
940 use std::sync::atomic::AtomicU64;
941
942 use risingwave_common::array::*;
943 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
944 use risingwave_common::hash::Key64;
945 use risingwave_common::util::epoch::test_epoch;
946 use risingwave_common::util::sort_util::OrderType;
947 use risingwave_storage::memory::MemoryStateStore;
948
949 use super::*;
950 use crate::common::table::test_utils::gen_pbtable;
951 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
952
953 async fn create_in_memory_state_table(
954 mem_state: MemoryStateStore,
955 data_types: &[DataType],
956 order_types: &[OrderType],
957 pk_indices: &[usize],
958 table_id: u32,
959 ) -> StateTable<MemoryStateStore> {
960 let column_descs = data_types
961 .iter()
962 .enumerate()
963 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
964 .collect_vec();
965 StateTable::from_table_catalog(
966 &gen_pbtable(
967 TableId::new(table_id),
968 column_descs,
969 order_types.to_vec(),
970 pk_indices.to_vec(),
971 0,
972 ),
973 mem_state.clone(),
974 None,
975 )
976 .await
977 }
978
979 async fn create_executor<const T: AsOfJoinTypePrimitive>(
980 asof_desc: AsOfDesc,
981 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
982 let schema = Schema {
983 fields: vec![
984 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
986 Field::unnamed(DataType::Int64),
987 ],
988 };
989 let (tx_l, source_l) = MockSource::channel();
990 let source_l = source_l.into_executor(schema.clone(), vec![1]);
991 let (tx_r, source_r) = MockSource::channel();
992 let source_r = source_r.into_executor(schema, vec![1]);
993 let params_l = JoinParams::new(vec![0], vec![1]);
994 let params_r = JoinParams::new(vec![0], vec![1]);
995
996 let mem_state = MemoryStateStore::new();
997
998 let state_l = create_in_memory_state_table(
999 mem_state.clone(),
1000 &[DataType::Int64, DataType::Int64, DataType::Int64],
1001 &[
1002 OrderType::ascending(),
1003 OrderType::ascending(),
1004 OrderType::ascending(),
1005 ],
1006 &[0, asof_desc.left_idx, 1],
1007 0,
1008 )
1009 .await;
1010
1011 let state_r = create_in_memory_state_table(
1012 mem_state,
1013 &[DataType::Int64, DataType::Int64, DataType::Int64],
1014 &[
1015 OrderType::ascending(),
1016 OrderType::ascending(),
1017 OrderType::ascending(),
1018 ],
1019 &[0, asof_desc.right_idx, 1],
1020 1,
1021 )
1022 .await;
1023
1024 let schema: Schema = [source_l.schema().fields(), source_r.schema().fields()]
1025 .concat()
1026 .into_iter()
1027 .collect();
1028 let schema_len = schema.len();
1029 let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1030
1031 let executor = AsOfJoinExecutor::<Key64, MemoryStateStore, T>::new(
1032 ActorContext::for_test(123),
1033 info,
1034 source_l,
1035 source_r,
1036 params_l,
1037 params_r,
1038 vec![false],
1039 (0..schema_len).collect_vec(),
1040 state_l,
1041 state_r,
1042 Arc::new(AtomicU64::new(0)),
1043 Arc::new(StreamingMetrics::unused()),
1044 1024,
1045 2048,
1046 asof_desc,
1047 );
1048 (tx_l, tx_r, executor.boxed().execute())
1049 }
1050
1051 #[tokio::test]
1052 async fn test_as_of_inner_join() -> StreamExecutorResult<()> {
1053 let asof_desc = AsOfDesc {
1054 left_idx: 0,
1055 right_idx: 2,
1056 inequality_type: AsOfInequalityType::Lt,
1057 };
1058
1059 let chunk_l1 = StreamChunk::from_pretty(
1060 " I I I
1061 + 1 4 7
1062 + 2 5 8
1063 + 3 6 9",
1064 );
1065 let chunk_l2 = StreamChunk::from_pretty(
1066 " I I I
1067 + 3 8 1
1068 - 3 8 1",
1069 );
1070 let chunk_r1 = StreamChunk::from_pretty(
1071 " I I I
1072 + 2 1 7
1073 + 2 2 1
1074 + 2 3 4
1075 + 2 4 2
1076 + 6 1 9
1077 + 6 2 9",
1078 );
1079 let chunk_r2 = StreamChunk::from_pretty(
1080 " I I I
1081 - 2 3 4",
1082 );
1083 let chunk_r3 = StreamChunk::from_pretty(
1084 " I I I
1085 + 2 3 3",
1086 );
1087 let chunk_l3 = StreamChunk::from_pretty(
1088 " I I I
1089 - 2 5 8",
1090 );
1091 let chunk_l4 = StreamChunk::from_pretty(
1092 " I I I
1093 + 6 3 1
1094 + 6 4 1",
1095 );
1096 let chunk_r4 = StreamChunk::from_pretty(
1097 " I I I
1098 - 6 1 9",
1099 );
1100
1101 let (mut tx_l, mut tx_r, mut hash_join) =
1102 create_executor::<{ AsOfJoinType::Inner }>(asof_desc).await;
1103
1104 tx_l.push_barrier(test_epoch(1), false);
1106 tx_r.push_barrier(test_epoch(1), false);
1107 hash_join.next_unwrap_ready_barrier()?;
1108
1109 tx_l.push_chunk(chunk_l1);
1111 hash_join.next_unwrap_pending();
1112
1113 tx_l.push_barrier(test_epoch(2), false);
1115 tx_r.push_barrier(test_epoch(2), false);
1116 hash_join.next_unwrap_ready_barrier()?;
1117
1118 tx_l.push_chunk(chunk_l2);
1120 hash_join.next_unwrap_pending();
1121
1122 tx_r.push_chunk(chunk_r1);
1124 let chunk = hash_join.next_unwrap_ready_chunk()?;
1125 assert_eq!(
1126 chunk,
1127 StreamChunk::from_pretty(
1128 " I I I I I I
1129 + 2 5 8 2 1 7
1130 - 2 5 8 2 1 7
1131 + 2 5 8 2 3 4"
1132 )
1133 );
1134
1135 tx_r.push_chunk(chunk_r2);
1137 let chunk = hash_join.next_unwrap_ready_chunk()?;
1138 assert_eq!(
1139 chunk,
1140 StreamChunk::from_pretty(
1141 " I I I I I I
1142 - 2 5 8 2 3 4
1143 + 2 5 8 2 1 7"
1144 )
1145 );
1146
1147 tx_r.push_chunk(chunk_r3);
1149 let chunk = hash_join.next_unwrap_ready_chunk()?;
1150 assert_eq!(
1151 chunk,
1152 StreamChunk::from_pretty(
1153 " I I I I I I
1154 - 2 5 8 2 1 7
1155 + 2 5 8 2 3 3"
1156 )
1157 );
1158
1159 tx_l.push_chunk(chunk_l3);
1161 let chunk = hash_join.next_unwrap_ready_chunk()?;
1162 assert_eq!(
1163 chunk,
1164 StreamChunk::from_pretty(
1165 " I I I I I I
1166 - 2 5 8 2 3 3"
1167 )
1168 );
1169
1170 tx_l.push_chunk(chunk_l4);
1172 let chunk = hash_join.next_unwrap_ready_chunk()?;
1173 assert_eq!(
1174 chunk,
1175 StreamChunk::from_pretty(
1176 " I I I I I I
1177 + 6 3 1 6 1 9
1178 + 6 4 1 6 1 9"
1179 )
1180 );
1181
1182 tx_r.push_chunk(chunk_r4);
1184 let chunk = hash_join.next_unwrap_ready_chunk()?;
1185 assert_eq!(
1186 chunk,
1187 StreamChunk::from_pretty(
1188 " I I I I I I
1189 - 6 3 1 6 1 9
1190 + 6 3 1 6 2 9
1191 - 6 4 1 6 1 9
1192 + 6 4 1 6 2 9"
1193 )
1194 );
1195
1196 Ok(())
1197 }
1198
1199 #[tokio::test]
1200 async fn test_as_of_left_outer_join() -> StreamExecutorResult<()> {
1201 let asof_desc = AsOfDesc {
1202 left_idx: 1,
1203 right_idx: 2,
1204 inequality_type: AsOfInequalityType::Ge,
1205 };
1206
1207 let chunk_l1 = StreamChunk::from_pretty(
1208 " I I I
1209 + 1 4 7
1210 + 2 5 8
1211 + 3 6 9",
1212 );
1213 let chunk_l2 = StreamChunk::from_pretty(
1214 " I I I
1215 + 3 8 1
1216 - 3 8 1",
1217 );
1218 let chunk_r1 = StreamChunk::from_pretty(
1219 " I I I
1220 + 2 3 4
1221 + 2 2 5
1222 + 2 1 5
1223 + 6 1 8
1224 + 6 2 9",
1225 );
1226 let chunk_r2 = StreamChunk::from_pretty(
1227 " I I I
1228 - 2 3 4
1229 - 2 1 5
1230 - 2 2 5",
1231 );
1232 let chunk_l3 = StreamChunk::from_pretty(
1233 " I I I
1234 + 6 8 9",
1235 );
1236 let chunk_r3 = StreamChunk::from_pretty(
1237 " I I I
1238 - 6 1 8",
1239 );
1240
1241 let (mut tx_l, mut tx_r, mut hash_join) =
1242 create_executor::<{ AsOfJoinType::LeftOuter }>(asof_desc).await;
1243
1244 tx_l.push_barrier(test_epoch(1), false);
1246 tx_r.push_barrier(test_epoch(1), false);
1247 hash_join.next_unwrap_ready_barrier()?;
1248
1249 tx_l.push_chunk(chunk_l1);
1251 let chunk = hash_join.next_unwrap_ready_chunk()?;
1252 assert_eq!(
1253 chunk,
1254 StreamChunk::from_pretty(
1255 " I I I I I I
1256 + 1 4 7 . . .
1257 + 2 5 8 . . .
1258 + 3 6 9 . . ."
1259 )
1260 );
1261
1262 tx_l.push_barrier(test_epoch(2), false);
1264 tx_r.push_barrier(test_epoch(2), false);
1265 hash_join.next_unwrap_ready_barrier()?;
1266
1267 tx_l.push_chunk(chunk_l2);
1269 let chunk = hash_join.next_unwrap_ready_chunk()?;
1270 assert_eq!(
1271 chunk,
1272 StreamChunk::from_pretty(
1273 " I I I I I I
1274 + 3 8 1 . . .
1275 - 3 8 1 . . ."
1276 )
1277 );
1278
1279 tx_r.push_chunk(chunk_r1);
1281 let chunk = hash_join.next_unwrap_ready_chunk()?;
1282 assert_eq!(
1283 chunk,
1284 StreamChunk::from_pretty(
1285 " I I I I I I
1286 - 2 5 8 . . .
1287 + 2 5 8 2 3 4
1288 - 2 5 8 2 3 4
1289 + 2 5 8 2 2 5
1290 - 2 5 8 2 2 5
1291 + 2 5 8 2 1 5"
1292 )
1293 );
1294
1295 tx_r.push_chunk(chunk_r2);
1297 let chunk = hash_join.next_unwrap_ready_chunk()?;
1298 assert_eq!(
1299 chunk,
1300 StreamChunk::from_pretty(
1301 " I I I I I I
1302 - 2 5 8 2 1 5
1303 + 2 5 8 2 2 5
1304 - 2 5 8 2 2 5
1305 + 2 5 8 . . ."
1306 )
1307 );
1308
1309 tx_l.push_chunk(chunk_l3);
1311 let chunk = hash_join.next_unwrap_ready_chunk()?;
1312 assert_eq!(
1313 chunk,
1314 StreamChunk::from_pretty(
1315 " I I I I I I
1316 + 6 8 9 6 1 8"
1317 )
1318 );
1319
1320 tx_r.push_chunk(chunk_r3);
1322 let chunk = hash_join.next_unwrap_ready_chunk()?;
1323 assert_eq!(
1324 chunk,
1325 StreamChunk::from_pretty(
1326 " I I I I I I
1327 - 6 8 9 6 1 8
1328 + 6 8 9 . . ."
1329 )
1330 );
1331 Ok(())
1332 }
1333}