1use std::cmp::Ordering;
22use std::collections::BTreeMap;
23use std::ops::Bound;
24use std::time::Duration;
25
26use either::Either;
27use itertools::Itertools;
28use multimap::MultiMap;
29use risingwave_common::array::Op;
30use risingwave_common::metrics::LabelGuardedHistogram;
31use risingwave_common::row::RowExt;
32use risingwave_common::util::epoch::EpochPair;
33use risingwave_common::util::sort_util::cmp_rows_ascending;
34use tokio::time::Instant;
35
36use super::barrier_align::*;
37use super::join::asof_join::*;
38use super::join::builder::JoinStreamChunkBuilder;
39use super::join::row::JoinRow;
40use super::join::*;
41use super::watermark::*;
42use crate::executor::join::builder::JoinChunkBuilder;
43use crate::executor::prelude::*;
44
45pub struct JoinParams {
46 pub join_key_indices: Vec<usize>,
48 pub deduped_pk_indices: Vec<usize>,
50}
51
52impl JoinParams {
53 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
54 Self {
55 join_key_indices,
56 deduped_pk_indices,
57 }
58 }
59}
60
61struct JoinSide<S: StateStore, E: AsOfRowEncoding> {
62 ht: AsOfJoinHashMap<S, E>,
64 join_key_indices: Vec<usize>,
66 all_data_types: Vec<DataType>,
68 i2o_mapping: Vec<(usize, usize)>,
70 i2o_mapping_indexed: MultiMap<usize, usize>,
71 inequal_key_idx: usize,
73}
74
75impl<S: StateStore, E: AsOfRowEncoding> std::fmt::Debug for JoinSide<S, E> {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("JoinSide")
78 .field("join_key_indices", &self.join_key_indices)
79 .field("col_types", &self.all_data_types)
80 .field("i2o_mapping", &self.i2o_mapping)
81 .finish()
82 }
83}
84
85impl<S: StateStore, E: AsOfRowEncoding> JoinSide<S, E> {
86 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
87 self.ht.init(epoch).await
88 }
89}
90
91pub struct AsOfJoinExecutor<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> {
97 ctx: ActorContextRef,
98 info: ExecutorInfo,
99
100 input_l: Option<Executor>,
102 input_r: Option<Executor>,
104 actual_output_data_types: Vec<DataType>,
106 side_l: JoinSide<S, E>,
108 side_r: JoinSide<S, E>,
110
111 null_safe: Vec<bool>,
113
114 metrics: Arc<StreamingMetrics>,
115 chunk_size: usize,
117 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
119 asof_desc: AsOfDesc,
121 cnt_rows_received: u32,
123 join_cache_evict_interval_rows: u32,
125 high_join_amplification_threshold: usize,
127}
128
129impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> std::fmt::Debug
130 for AsOfJoinExecutor<S, T, E>
131{
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.debug_struct("AsOfJoinExecutor")
134 .field("join_type", &T)
135 .field("input_left", &self.input_l.as_ref().unwrap().identity())
136 .field("input_right", &self.input_r.as_ref().unwrap().identity())
137 .field("side_l", &self.side_l)
138 .field("side_r", &self.side_r)
139 .field("stream_key", &self.info.stream_key)
140 .field("schema", &self.info.schema)
141 .field("actual_output_data_types", &self.actual_output_data_types)
142 .finish()
143 }
144}
145
146impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> Execute
147 for AsOfJoinExecutor<S, T, E>
148{
149 fn execute(self: Box<Self>) -> BoxedMessageStream {
150 self.into_stream().boxed()
151 }
152}
153
154struct EqJoinArgs<'a, S: StateStore, E: AsOfRowEncoding> {
155 ctx: &'a ActorContextRef,
156 side_l: &'a mut JoinSide<S, E>,
157 side_r: &'a mut JoinSide<S, E>,
158 null_safe: &'a [bool],
159 asof_desc: &'a AsOfDesc,
160 actual_output_data_types: &'a [DataType],
161 chunk: StreamChunk,
162 chunk_size: usize,
163 cnt_rows_received: &'a mut u32,
164 join_cache_evict_interval_rows: u32,
165 high_join_amplification_threshold: usize,
166 join_matched_join_keys: &'a LabelGuardedHistogram,
169}
170
171impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> AsOfJoinExecutor<S, T, E> {
172 #[expect(clippy::too_many_arguments)]
173 pub fn new(
174 ctx: ActorContextRef,
175 info: ExecutorInfo,
176 input_l: Executor,
177 input_r: Executor,
178 params_l: JoinParams,
179 params_r: JoinParams,
180 null_safe: Vec<bool>,
181 output_indices: Vec<usize>,
182 state_table_l: StateTable<S>,
183 state_table_r: StateTable<S>,
184 watermark_epoch: AtomicU64Ref,
185 metrics: Arc<StreamingMetrics>,
186 chunk_size: usize,
187 asof_desc: AsOfDesc,
188 use_cache: bool,
189 high_join_amplification_threshold: usize,
190 ) -> Self {
191 let join_cache_evict_interval_rows = ctx
192 .config
193 .developer
194 .join_hash_map_evict_interval_rows
195 .max(1);
196 let cache_epoch = if use_cache {
197 Some(watermark_epoch)
198 } else {
199 None
200 };
201 let schema_fields = [
202 input_l.schema().fields.clone(),
203 input_r.schema().fields.clone(),
204 ]
205 .concat();
206
207 let original_output_data_types = schema_fields
208 .iter()
209 .map(|field| field.data_type())
210 .collect_vec();
211 let actual_output_data_types = output_indices
212 .iter()
213 .map(|&idx| original_output_data_types[idx].clone())
214 .collect_vec();
215
216 let state_all_data_types_l = input_l.schema().data_types();
217 let state_all_data_types_r = input_r.schema().data_types();
218
219 let state_join_key_indices_l = params_l.join_key_indices;
220 let state_join_key_indices_r = params_r.join_key_indices;
221
222 let join_key_data_types_l = state_join_key_indices_l
223 .iter()
224 .map(|idx| state_all_data_types_l[*idx].clone())
225 .collect_vec();
226
227 let join_key_data_types_r = state_join_key_indices_r
228 .iter()
229 .map(|idx| state_all_data_types_r[*idx].clone())
230 .collect_vec();
231
232 assert_eq!(join_key_data_types_l, join_key_data_types_r);
233
234 let (left_to_output, right_to_output) = {
235 let (left_len, right_len) = if is_left_semi_or_anti(T) {
236 (state_all_data_types_l.len(), 0usize)
237 } else if is_right_semi_or_anti(T) {
238 (0usize, state_all_data_types_r.len())
239 } else {
240 (state_all_data_types_l.len(), state_all_data_types_r.len())
241 };
242 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
243 };
244
245 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
246 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
247
248 let watermark_buffers = BTreeMap::new();
249
250 let inequal_key_idx_l = asof_desc.left_idx;
251 let inequal_key_idx_r = asof_desc.right_idx;
252
253 Self {
254 ctx: ctx.clone(),
255 info,
256 input_l: Some(input_l),
257 input_r: Some(input_r),
258 actual_output_data_types,
259 null_safe,
260 side_l: JoinSide {
261 ht: AsOfJoinHashMap::new(
262 state_join_key_indices_l.clone(),
263 state_table_l,
264 params_l.deduped_pk_indices,
265 inequal_key_idx_l,
266 state_all_data_types_l.clone(),
267 cache_epoch.clone(),
268 metrics.clone(),
269 ctx.id,
270 ctx.fragment_id,
271 "left",
272 ),
273 join_key_indices: state_join_key_indices_l,
274 all_data_types: state_all_data_types_l,
275 i2o_mapping: left_to_output,
276 i2o_mapping_indexed: l2o_indexed,
277 inequal_key_idx: inequal_key_idx_l,
278 },
279 side_r: JoinSide {
280 ht: AsOfJoinHashMap::new(
281 state_join_key_indices_r.clone(),
282 state_table_r,
283 params_r.deduped_pk_indices,
284 inequal_key_idx_r,
285 state_all_data_types_r.clone(),
286 cache_epoch,
287 metrics.clone(),
288 ctx.id,
289 ctx.fragment_id,
290 "right",
291 ),
292 join_key_indices: state_join_key_indices_r,
293 all_data_types: state_all_data_types_r,
294 i2o_mapping: right_to_output,
295 i2o_mapping_indexed: r2o_indexed,
296 inequal_key_idx: inequal_key_idx_r,
297 },
298 metrics,
299 chunk_size,
300 watermark_buffers,
301 asof_desc,
302 cnt_rows_received: 0,
303 join_cache_evict_interval_rows,
304 high_join_amplification_threshold,
305 }
306 }
307
308 fn evict_cache(
310 side_l: &mut JoinSide<S, E>,
311 side_r: &mut JoinSide<S, E>,
312 cnt_rows_received: &mut u32,
313 join_cache_evict_interval_rows: u32,
314 ) {
315 *cnt_rows_received += 1;
316 if *cnt_rows_received >= join_cache_evict_interval_rows {
317 side_l.ht.evict_cache();
318 side_r.ht.evict_cache();
319 *cnt_rows_received = 0;
320 }
321 }
322
323 #[try_stream(ok = Message, error = StreamExecutorError)]
324 async fn into_stream(mut self) {
325 let input_l = self.input_l.take().unwrap();
326 let input_r = self.input_r.take().unwrap();
327 let aligned_stream = barrier_align(
328 input_l.execute(),
329 input_r.execute(),
330 self.ctx.id,
331 self.ctx.fragment_id,
332 self.metrics.clone(),
333 "Join",
334 );
335 pin_mut!(aligned_stream);
336 let actor_id = self.ctx.id;
337
338 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
339 let first_epoch = barrier.epoch;
340 yield Message::Barrier(barrier);
341 self.side_l.init(first_epoch).await?;
342 self.side_r.init(first_epoch).await?;
343
344 let actor_id_str = self.ctx.id.to_string();
345 let fragment_id_str = self.ctx.fragment_id.to_string();
346
347 let join_actor_input_waiting_duration_ns = self
348 .metrics
349 .join_actor_input_waiting_duration_ns
350 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
351 let left_join_match_duration_ns = self
352 .metrics
353 .join_match_duration_ns
354 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
355 let right_join_match_duration_ns = self
356 .metrics
357 .join_match_duration_ns
358 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
359
360 let barrier_join_match_duration_ns = self
361 .metrics
362 .join_match_duration_ns
363 .with_guarded_label_values(&[
364 actor_id_str.as_str(),
365 fragment_id_str.as_str(),
366 "barrier",
367 ]);
368
369 let left_join_cached_entry_count = self
370 .metrics
371 .join_cached_entry_count
372 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
373
374 let right_join_cached_entry_count = self
375 .metrics
376 .join_cached_entry_count
377 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
378
379 let right_table_id_str = self.side_r.ht.table_id().to_string();
381 let join_matched_join_keys = self
382 .metrics
383 .join_matched_join_keys
384 .with_guarded_label_values(&[
385 actor_id_str.as_str(),
386 fragment_id_str.as_str(),
387 right_table_id_str.as_str(),
388 ]);
389
390 let mut start_time = Instant::now();
391
392 while let Some(msg) = aligned_stream
393 .next()
394 .instrument_await("hash_join_barrier_align")
395 .await
396 {
397 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
398 match msg? {
399 AlignedMessage::WatermarkLeft(watermark) => {
400 for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
401 yield Message::Watermark(watermark_to_emit);
402 }
403 }
404 AlignedMessage::WatermarkRight(watermark) => {
405 for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
406 yield Message::Watermark(watermark_to_emit);
407 }
408 }
409 AlignedMessage::Left(chunk) => {
410 let mut left_time = Duration::from_nanos(0);
411 let mut left_start_time = Instant::now();
412 #[for_await]
413 for chunk in Self::eq_join_left(EqJoinArgs {
414 ctx: &self.ctx,
415 side_l: &mut self.side_l,
416 side_r: &mut self.side_r,
417 null_safe: &self.null_safe,
418 asof_desc: &self.asof_desc,
419 actual_output_data_types: &self.actual_output_data_types,
420 chunk,
421 chunk_size: self.chunk_size,
422 cnt_rows_received: &mut self.cnt_rows_received,
423 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
424 high_join_amplification_threshold: self.high_join_amplification_threshold,
425 join_matched_join_keys: &join_matched_join_keys,
426 }) {
427 left_time += left_start_time.elapsed();
428 yield Message::Chunk(chunk?);
429 left_start_time = Instant::now();
430 }
431 left_time += left_start_time.elapsed();
432 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
433 self.try_flush_data().await?;
434 }
435 AlignedMessage::Right(chunk) => {
436 let mut right_time = Duration::from_nanos(0);
437 let mut right_start_time = Instant::now();
438 #[for_await]
439 for chunk in Self::eq_join_right(EqJoinArgs {
440 ctx: &self.ctx,
441 side_l: &mut self.side_l,
442 side_r: &mut self.side_r,
443 null_safe: &self.null_safe,
444 asof_desc: &self.asof_desc,
445 actual_output_data_types: &self.actual_output_data_types,
446 chunk,
447 chunk_size: self.chunk_size,
448 cnt_rows_received: &mut self.cnt_rows_received,
449 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
450 high_join_amplification_threshold: self.high_join_amplification_threshold,
451 join_matched_join_keys: &join_matched_join_keys,
452 }) {
453 right_time += right_start_time.elapsed();
454 yield Message::Chunk(chunk?);
455 right_start_time = Instant::now();
456 }
457 right_time += right_start_time.elapsed();
458 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
459 self.try_flush_data().await?;
460 }
461 AlignedMessage::Barrier(barrier) => {
462 let barrier_start_time = Instant::now();
463 let (left_post_commit, right_post_commit) =
464 self.flush_data(barrier.epoch).await?;
465
466 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
467 yield Message::Barrier(barrier);
468
469 right_post_commit
471 .post_yield_barrier(update_vnode_bitmap.clone())
472 .await?;
473 if left_post_commit
474 .post_yield_barrier(update_vnode_bitmap)
475 .await?
476 .unwrap_or(false)
477 {
478 self.watermark_buffers
479 .values_mut()
480 .for_each(|buffers| buffers.clear());
481 }
482
483 for (join_cached_entry_count, ht) in [
485 (&left_join_cached_entry_count, &self.side_l.ht),
486 (&right_join_cached_entry_count, &self.side_r.ht),
487 ] {
488 join_cached_entry_count.set(ht.entry_count() as i64);
489 }
490
491 barrier_join_match_duration_ns
492 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
493 }
494 }
495 start_time = Instant::now();
496 }
497 }
498
499 async fn flush_data(
500 &mut self,
501 epoch: EpochPair,
502 ) -> StreamExecutorResult<(
503 AsOfJoinHashMapPostCommit<'_, S, E>,
504 AsOfJoinHashMapPostCommit<'_, S, E>,
505 )> {
506 let left = self.side_l.ht.flush(epoch).await?;
507 let right = self.side_r.ht.flush(epoch).await?;
508 Ok((left, right))
509 }
510
511 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
512 self.side_l.ht.try_flush().await?;
513 self.side_r.ht.try_flush().await?;
514 Ok(())
515 }
516
517 fn handle_watermark(
518 &mut self,
519 side: SideTypePrimitive,
520 watermark: Watermark,
521 ) -> StreamExecutorResult<Vec<Watermark>> {
522 let (side_update, side_match) = if side == SideType::Left {
523 (&mut self.side_l, &mut self.side_r)
524 } else {
525 (&mut self.side_r, &mut self.side_l)
526 };
527
528 if side_update.join_key_indices[0] == watermark.col_idx {
530 side_match.ht.update_watermark(watermark.val.clone());
531 }
532
533 let wm_in_jk = side_update
535 .join_key_indices
536 .iter()
537 .positions(|idx| *idx == watermark.col_idx);
538 let mut watermarks_to_emit = vec![];
539 for idx in wm_in_jk {
540 let buffers = self
541 .watermark_buffers
542 .entry(idx)
543 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
544 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
545 let empty_indices = vec![];
546 let output_indices = side_update
547 .i2o_mapping_indexed
548 .get_vec(&side_update.join_key_indices[idx])
549 .unwrap_or(&empty_indices)
550 .iter()
551 .chain(
552 side_match
553 .i2o_mapping_indexed
554 .get_vec(&side_match.join_key_indices[idx])
555 .unwrap_or(&empty_indices),
556 );
557 for output_idx in output_indices {
558 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
559 }
560 };
561 }
562 Ok(watermarks_to_emit)
563 }
564
565 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
566 async fn eq_join_left(args: EqJoinArgs<'_, S, E>) {
567 let EqJoinArgs {
568 ctx: _,
569 side_l,
570 side_r,
571 null_safe,
572 asof_desc,
573 actual_output_data_types,
574 chunk,
575 chunk_size,
576 cnt_rows_received,
577 join_cache_evict_interval_rows,
578 high_join_amplification_threshold: _,
579 join_matched_join_keys: _,
580 } = args;
581
582 let (side_update, side_match) = (side_l, side_r);
583
584 let mut join_chunk_builder =
585 JoinChunkBuilder::<T, { SideType::Left }>::new(JoinStreamChunkBuilder::new(
586 chunk_size,
587 actual_output_data_types.to_vec(),
588 side_update.i2o_mapping.clone(),
589 side_match.i2o_mapping.clone(),
590 ));
591
592 let inequal_key_idx_update = [side_update.inequal_key_idx];
594
595 for r in chunk.rows_with_holes() {
596 let Some((op, row)) = r else {
597 continue;
598 };
599 Self::evict_cache(
600 side_update,
601 side_match,
602 cnt_rows_received,
603 join_cache_evict_interval_rows,
604 );
605
606 let join_key_null_not_safe = side_update
609 .join_key_indices
610 .iter()
611 .zip_eq(null_safe.iter())
612 .any(|(idx, ns)| !ns && row.datum_at(*idx).is_none());
613 if join_key_null_not_safe {
614 match op {
615 Op::Insert | Op::UpdateInsert => {
616 if let Some(chunk) =
617 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
618 {
619 yield chunk;
620 }
621 }
622 Op::Delete | Op::UpdateDelete => {
623 if let Some(chunk) =
624 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
625 {
626 yield chunk;
627 }
628 }
629 }
630 continue;
631 }
632
633 let join_key = row.project(&side_update.join_key_indices);
634
635 let inequal_key_is_null = side_update.ht.check_inequal_key_null(&row);
636 let inequal_key = row.project(&inequal_key_idx_update);
637
638 if !inequal_key_is_null {
639 let matched_row_by_inequality = match asof_desc.inequality_type {
640 AsOfInequalityType::Lt => {
641 side_match
642 .ht
643 .lower_bound_by_inequality_with_jk_prefix(
644 &join_key,
645 Bound::Excluded(&inequal_key),
646 )
647 .await
648 }
649 AsOfInequalityType::Le => {
650 side_match
651 .ht
652 .lower_bound_by_inequality_with_jk_prefix(
653 &join_key,
654 Bound::Included(&inequal_key),
655 )
656 .await
657 }
658 AsOfInequalityType::Gt => {
659 side_match
660 .ht
661 .upper_bound_by_inequality_with_jk_prefix(
662 &join_key,
663 Bound::Excluded(&inequal_key),
664 )
665 .await
666 }
667 AsOfInequalityType::Ge => {
668 side_match
669 .ht
670 .upper_bound_by_inequality_with_jk_prefix(
671 &join_key,
672 Bound::Included(&inequal_key),
673 )
674 .await
675 }
676 }?
677 .map(|row| JoinRow::new(row, 0));
678 match op {
679 Op::Insert | Op::UpdateInsert => {
680 if let Some(matched_row) = matched_row_by_inequality {
681 if let Some(chunk) =
682 join_chunk_builder.with_match_on_insert(&row, &matched_row)
683 {
684 yield chunk;
685 }
686 } else if let Some(chunk) =
687 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
688 {
689 yield chunk;
690 }
691 side_update.ht.insert(row)?;
692 }
693 Op::Delete | Op::UpdateDelete => {
694 if let Some(matched_row) = matched_row_by_inequality {
695 if let Some(chunk) =
696 join_chunk_builder.with_match_on_delete(&row, &matched_row)
697 {
698 yield chunk;
699 }
700 } else if let Some(chunk) =
701 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
702 {
703 yield chunk;
704 }
705 side_update.ht.delete(row)?;
706 }
707 }
708 } else {
709 match op {
712 Op::Insert | Op::UpdateInsert => {
713 if let Some(chunk) =
714 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
715 {
716 yield chunk;
717 }
718 }
719 Op::Delete | Op::UpdateDelete => {
720 if let Some(chunk) =
721 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
722 {
723 yield chunk;
724 }
725 }
726 }
727 }
728 }
729 if let Some(chunk) = join_chunk_builder.take() {
730 yield chunk;
731 }
732 }
733
734 fn cmp_pk_rows(pk1: &impl Row, pk2: &impl Row) -> Ordering {
735 cmp_rows_ascending(pk1, pk2)
736 }
737
738 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
739 async fn eq_join_right(args: EqJoinArgs<'_, S, E>) {
740 let EqJoinArgs {
741 ctx,
742 side_l,
743 side_r,
744 null_safe,
745 asof_desc,
746 actual_output_data_types,
747 chunk,
748 chunk_size,
749 cnt_rows_received,
750 join_cache_evict_interval_rows,
751 high_join_amplification_threshold,
752 join_matched_join_keys,
753 } = args;
754
755 let (side_update, side_match) = (side_r, side_l);
756
757 let mut join_chunk_builder = JoinStreamChunkBuilder::new(
758 chunk_size,
759 actual_output_data_types.to_vec(),
760 side_update.i2o_mapping.clone(),
761 side_match.i2o_mapping.clone(),
762 );
763
764 let inequal_key_idx_update = [side_update.inequal_key_idx];
766
767 for r in chunk.rows_with_holes() {
768 let Some((op, row)) = r else {
769 continue;
770 };
771 Self::evict_cache(
772 side_update,
773 side_match,
774 cnt_rows_received,
775 join_cache_evict_interval_rows,
776 );
777
778 let join_key_null_not_safe = side_update
781 .join_key_indices
782 .iter()
783 .zip_eq(null_safe.iter())
784 .any(|(idx, ns)| !ns && row.datum_at(*idx).is_none());
785 if join_key_null_not_safe {
786 continue;
787 }
788
789 let join_key = row.project(&side_update.join_key_indices);
790
791 let inequal_key_is_null = side_update.ht.check_inequal_key_null(&row);
792 let inequal_key = row.project(&inequal_key_idx_update);
793
794 let mut join_matched_rows_cnt = 0;
795
796 if !inequal_key_is_null {
797 let (row_to_delete_r, row_to_insert_r) = {
798 let (first_row, second_row) = side_update
799 .ht
800 .first_two_by_inequality_with_jk_prefix(&join_key, &inequal_key)
801 .await?;
802 if let Some(first_row) = first_row {
803 let row_pk = side_update.ht.get_pk_from_row(row);
804 let first_pk = side_update.ht.get_pk_from_row(&first_row).to_owned_row();
805 match op {
806 Op::Insert | Op::UpdateInsert => {
807 if Self::cmp_pk_rows(&first_pk, &row_pk) == Ordering::Greater {
809 (Some(Either::Left(first_row)), Some(Either::Right(row)))
810 } else {
811 (None, None)
813 }
814 }
815 Op::Delete | Op::UpdateDelete => {
816 if Self::cmp_pk_rows(&first_pk, &row_pk) == Ordering::Equal {
817 if let Some(second_row) = second_row {
818 (Some(Either::Right(row)), Some(Either::Left(second_row)))
819 } else {
820 (Some(Either::Right(row)), None)
821 }
822 } else {
823 (None, None)
825 }
826 }
827 }
828 } else {
829 match op {
830 Op::Insert | Op::UpdateInsert => (None, Some(Either::Right(row))),
832 Op::Delete | Op::UpdateDelete => (Some(Either::Right(row)), None),
834 }
835 }
836 };
837 if row_to_delete_r.is_none() && row_to_insert_r.is_none() {
843 } else {
845 let prev_inequality_key = side_update
846 .ht
847 .upper_bound_by_inequality_with_jk_prefix(
848 &join_key,
849 Bound::Excluded(&inequal_key),
850 )
851 .await?
852 .map(|r| r.project(&inequal_key_idx_update));
853 let next_inequality_key = side_update
854 .ht
855 .lower_bound_by_inequality_with_jk_prefix(
856 &join_key,
857 Bound::Excluded(&inequal_key),
858 )
859 .await?
860 .map(|r| r.project(&inequal_key_idx_update));
861
862 let affected_inequality_key_r = match asof_desc.inequality_type {
863 AsOfInequalityType::Lt | AsOfInequalityType::Le => &next_inequality_key,
864 AsOfInequalityType::Gt | AsOfInequalityType::Ge => &prev_inequality_key,
865 };
866 let affected_row_r =
867 if let Some(affected_inequality_key_r) = affected_inequality_key_r {
868 side_update
869 .ht
870 .first_by_inequality_with_jk_prefix(
871 &join_key,
872 &affected_inequality_key_r,
873 )
874 .await?
875 } else {
876 None
877 }
878 .map(Either::Left);
879
880 let (row_to_delete_r, row_to_insert_r) =
881 match (&row_to_delete_r, &row_to_insert_r) {
882 (Some(_), Some(_)) => (row_to_delete_r, row_to_insert_r),
883 (None, Some(_)) => (affected_row_r, row_to_insert_r),
884 (Some(_), None) => (row_to_delete_r, affected_row_r),
885 (None, None) => unreachable!(),
886 };
887 let range = match asof_desc.inequality_type {
888 AsOfInequalityType::Lt => (
889 prev_inequality_key
890 .map(Either::Left)
891 .map_or_else(|| Bound::Unbounded, Bound::Included),
892 Bound::Excluded(Either::Right(&inequal_key)),
893 ),
894 AsOfInequalityType::Le => (
895 prev_inequality_key
896 .map(Either::Left)
897 .map_or_else(|| Bound::Unbounded, Bound::Excluded),
898 Bound::Included(Either::Right(&inequal_key)),
899 ),
900 AsOfInequalityType::Gt => (
901 Bound::Excluded(Either::Right(&inequal_key)),
902 next_inequality_key
903 .map(Either::Left)
904 .map_or_else(|| Bound::Unbounded, Bound::Included),
905 ),
906 AsOfInequalityType::Ge => (
907 Bound::Included(Either::Right(&inequal_key)),
908 next_inequality_key
909 .map(Either::Left)
910 .map_or_else(|| Bound::Unbounded, Bound::Excluded),
911 ),
912 };
913
914 let rows_l_stream = side_match
915 .ht
916 .range_by_inequality_with_jk_prefix(&join_key, &range)
917 .await?;
918 #[for_await]
919 for row_l in rows_l_stream {
920 let row_l = row_l?;
921 join_matched_rows_cnt += 1;
922 if let Some(row_to_delete_r) = &row_to_delete_r {
923 if let Some(chunk) =
924 join_chunk_builder.append_row(Op::Delete, row_to_delete_r, &row_l)
925 {
926 yield chunk;
927 }
928 } else if is_as_of_left_outer(T)
929 && let Some(chunk) =
930 join_chunk_builder.append_row_matched(Op::Delete, &row_l)
931 {
932 yield chunk;
933 }
934 if let Some(row_to_insert_r) = &row_to_insert_r {
935 if let Some(chunk) =
936 join_chunk_builder.append_row(Op::Insert, row_to_insert_r, &row_l)
937 {
938 yield chunk;
939 }
940 } else if is_as_of_left_outer(T)
941 && let Some(chunk) =
942 join_chunk_builder.append_row_matched(Op::Insert, &row_l)
943 {
944 yield chunk;
945 }
946 }
947 }
948
949 match op {
950 Op::Insert | Op::UpdateInsert => {
951 side_update.ht.insert(row)?;
952 }
953 Op::Delete | Op::UpdateDelete => {
954 side_update.ht.delete(row)?;
955 }
956 }
957 } else {
958 }
961 join_matched_join_keys.observe(join_matched_rows_cnt as _);
962 if join_matched_rows_cnt > high_join_amplification_threshold {
963 let join_key = row.project(&side_update.join_key_indices);
964 tracing::warn!(target: "high_join_amplification",
965 matched_rows_len = join_matched_rows_cnt,
966 update_table_id = %side_update.ht.table_id(),
967 match_table_id = %side_match.ht.table_id(),
968 join_key = ?join_key,
969 actor_id = %ctx.id,
970 fragment_id = %ctx.fragment_id,
971 "large rows matched for join key when AsOf join updating right side",
972 );
973 }
974 }
975 if let Some(chunk) = join_chunk_builder.take() {
976 yield chunk;
977 }
978 }
979}
980
981#[cfg(test)]
982mod tests {
983 use std::sync::atomic::AtomicU64;
984
985 use risingwave_common::array::*;
986 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
987 use risingwave_common::util::epoch::test_epoch;
988 use risingwave_common::util::sort_util::OrderType;
989 use risingwave_storage::memory::MemoryStateStore;
990
991 use super::*;
992 use crate::common::table::test_utils::gen_pbtable;
993 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
994
995 async fn create_in_memory_state_table(
996 mem_state: MemoryStateStore,
997 data_types: &[DataType],
998 order_types: &[OrderType],
999 pk_indices: &[usize],
1000 table_id: u32,
1001 ) -> StateTable<MemoryStateStore> {
1002 let column_descs = data_types
1003 .iter()
1004 .enumerate()
1005 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1006 .collect_vec();
1007 StateTable::from_table_catalog(
1008 &gen_pbtable(
1009 TableId::new(table_id),
1010 column_descs,
1011 order_types.to_vec(),
1012 pk_indices.to_vec(),
1013 0,
1014 ),
1015 mem_state.clone(),
1016 None,
1017 )
1018 .await
1019 }
1020
1021 async fn create_executor<const T: AsOfJoinTypePrimitive>(
1022 asof_desc: AsOfDesc,
1023 use_cache: bool,
1024 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1025 let schema = Schema {
1026 fields: vec![
1027 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1029 Field::unnamed(DataType::Int64),
1030 ],
1031 };
1032 let (tx_l, source_l) = MockSource::channel();
1033 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1034 let (tx_r, source_r) = MockSource::channel();
1035 let source_r = source_r.into_executor(schema, vec![1]);
1036 let params_l = JoinParams::new(vec![0], vec![1]);
1037 let params_r = JoinParams::new(vec![0], vec![1]);
1038
1039 let mem_state = MemoryStateStore::new();
1040
1041 let state_l = create_in_memory_state_table(
1042 mem_state.clone(),
1043 &[DataType::Int64, DataType::Int64, DataType::Int64],
1044 &[
1045 OrderType::ascending(),
1046 OrderType::ascending(),
1047 OrderType::ascending(),
1048 ],
1049 &[0, asof_desc.left_idx, 1],
1050 0,
1051 )
1052 .await;
1053
1054 let state_r = create_in_memory_state_table(
1055 mem_state,
1056 &[DataType::Int64, DataType::Int64, DataType::Int64],
1057 &[
1058 OrderType::ascending(),
1059 OrderType::ascending(),
1060 OrderType::ascending(),
1061 ],
1062 &[0, asof_desc.right_idx, 1],
1063 1,
1064 )
1065 .await;
1066
1067 let schema: Schema = [source_l.schema().fields(), source_r.schema().fields()]
1068 .concat()
1069 .into_iter()
1070 .collect();
1071 let schema_len = schema.len();
1072 let info = ExecutorInfo::for_test(schema, vec![1], "AsOfJoinExecutor".to_owned(), 0);
1073
1074 let executor = AsOfJoinExecutor::<MemoryStateStore, T, AsOfCpuEncoding>::new(
1075 ActorContext::for_test(123),
1076 info,
1077 source_l,
1078 source_r,
1079 params_l,
1080 params_r,
1081 vec![false],
1082 (0..schema_len).collect_vec(),
1083 state_l,
1084 state_r,
1085 Arc::new(AtomicU64::new(0)),
1086 Arc::new(StreamingMetrics::unused()),
1087 1024,
1088 asof_desc,
1089 use_cache,
1090 2048, );
1092 (tx_l, tx_r, executor.boxed().execute())
1093 }
1094
1095 #[tokio::test]
1096 async fn test_asof_inner_join() -> StreamExecutorResult<()> {
1097 test_asof_inner_join_impl(false).await
1098 }
1099
1100 #[tokio::test]
1101 async fn test_asof_inner_join_with_cache() -> StreamExecutorResult<()> {
1102 test_asof_inner_join_impl(true).await
1103 }
1104
1105 async fn test_asof_inner_join_impl(use_cache: bool) -> StreamExecutorResult<()> {
1106 let asof_desc = AsOfDesc {
1107 left_idx: 0,
1108 right_idx: 2,
1109 inequality_type: AsOfInequalityType::Lt,
1110 };
1111
1112 let chunk_l1 = StreamChunk::from_pretty(
1113 " I I I
1114 + 1 4 7
1115 + 2 5 8
1116 + 3 6 9",
1117 );
1118 let chunk_l2 = StreamChunk::from_pretty(
1119 " I I I
1120 + 3 8 1
1121 - 3 8 1",
1122 );
1123 let chunk_r1 = StreamChunk::from_pretty(
1124 " I I I
1125 + 2 1 7
1126 + 2 2 1
1127 + 2 3 4
1128 + 2 4 2
1129 + 6 1 9
1130 + 6 2 9",
1131 );
1132 let chunk_r2 = StreamChunk::from_pretty(
1133 " I I I
1134 - 2 3 4",
1135 );
1136 let chunk_r3 = StreamChunk::from_pretty(
1137 " I I I
1138 + 2 3 3",
1139 );
1140 let chunk_l3 = StreamChunk::from_pretty(
1141 " I I I
1142 - 2 5 8",
1143 );
1144 let chunk_l4 = StreamChunk::from_pretty(
1145 " I I I
1146 + 6 3 1
1147 + 6 4 1",
1148 );
1149 let chunk_r4 = StreamChunk::from_pretty(
1150 " I I I
1151 - 6 1 9",
1152 );
1153
1154 let (mut tx_l, mut tx_r, mut hash_join) =
1155 create_executor::<{ AsOfJoinType::Inner }>(asof_desc, use_cache).await;
1156
1157 tx_l.push_barrier(test_epoch(1), false);
1159 tx_r.push_barrier(test_epoch(1), false);
1160 hash_join.next_unwrap_ready_barrier()?;
1161
1162 tx_l.push_chunk(chunk_l1);
1164 hash_join.next_unwrap_pending();
1165
1166 tx_l.push_barrier(test_epoch(2), false);
1168 tx_r.push_barrier(test_epoch(2), false);
1169 hash_join.next_unwrap_ready_barrier()?;
1170
1171 tx_l.push_chunk(chunk_l2);
1173 hash_join.next_unwrap_pending();
1174
1175 tx_r.push_chunk(chunk_r1);
1177 let chunk = hash_join.next_unwrap_ready_chunk()?;
1178 assert_eq!(
1179 chunk,
1180 StreamChunk::from_pretty(
1181 " I I I I I I
1182 + 2 5 8 2 1 7
1183 - 2 5 8 2 1 7
1184 + 2 5 8 2 3 4"
1185 )
1186 );
1187
1188 tx_r.push_chunk(chunk_r2);
1190 let chunk = hash_join.next_unwrap_ready_chunk()?;
1191 assert_eq!(
1192 chunk,
1193 StreamChunk::from_pretty(
1194 " I I I I I I
1195 - 2 5 8 2 3 4
1196 + 2 5 8 2 1 7"
1197 )
1198 );
1199
1200 tx_r.push_chunk(chunk_r3);
1202 let chunk = hash_join.next_unwrap_ready_chunk()?;
1203 assert_eq!(
1204 chunk,
1205 StreamChunk::from_pretty(
1206 " I I I I I I
1207 - 2 5 8 2 1 7
1208 + 2 5 8 2 3 3"
1209 )
1210 );
1211
1212 tx_l.push_chunk(chunk_l3);
1214 let chunk = hash_join.next_unwrap_ready_chunk()?;
1215 assert_eq!(
1216 chunk,
1217 StreamChunk::from_pretty(
1218 " I I I I I I
1219 - 2 5 8 2 3 3"
1220 )
1221 );
1222
1223 tx_l.push_chunk(chunk_l4);
1225 let chunk = hash_join.next_unwrap_ready_chunk()?;
1226 assert_eq!(
1227 chunk,
1228 StreamChunk::from_pretty(
1229 " I I I I I I
1230 + 6 3 1 6 1 9
1231 + 6 4 1 6 1 9"
1232 )
1233 );
1234
1235 tx_r.push_chunk(chunk_r4);
1237 let chunk = hash_join.next_unwrap_ready_chunk()?;
1238 assert_eq!(
1239 chunk,
1240 StreamChunk::from_pretty(
1241 " I I I I I I
1242 - 6 3 1 6 1 9
1243 + 6 3 1 6 2 9
1244 - 6 4 1 6 1 9
1245 + 6 4 1 6 2 9"
1246 )
1247 );
1248
1249 Ok(())
1250 }
1251
1252 #[tokio::test]
1253 async fn test_asof_left_outer_join() -> StreamExecutorResult<()> {
1254 test_asof_left_outer_join_impl(false).await
1255 }
1256
1257 #[tokio::test]
1258 async fn test_asof_left_outer_join_with_cache() -> StreamExecutorResult<()> {
1259 test_asof_left_outer_join_impl(true).await
1260 }
1261
1262 async fn test_asof_left_outer_join_impl(use_cache: bool) -> StreamExecutorResult<()> {
1263 let asof_desc = AsOfDesc {
1264 left_idx: 1,
1265 right_idx: 2,
1266 inequality_type: AsOfInequalityType::Ge,
1267 };
1268
1269 let chunk_l1 = StreamChunk::from_pretty(
1270 " I I I
1271 + 1 4 7
1272 + 2 5 8
1273 + 3 6 9",
1274 );
1275 let chunk_l2 = StreamChunk::from_pretty(
1276 " I I I
1277 + 3 8 1
1278 - 3 8 1",
1279 );
1280 let chunk_r1 = StreamChunk::from_pretty(
1281 " I I I
1282 + 2 3 4
1283 + 2 2 5
1284 + 2 1 5
1285 + 6 1 8
1286 + 6 2 9",
1287 );
1288 let chunk_r2 = StreamChunk::from_pretty(
1289 " I I I
1290 - 2 3 4
1291 - 2 1 5
1292 - 2 2 5",
1293 );
1294 let chunk_l3 = StreamChunk::from_pretty(
1295 " I I I
1296 + 6 8 9",
1297 );
1298 let chunk_r3 = StreamChunk::from_pretty(
1299 " I I I
1300 - 6 1 8",
1301 );
1302
1303 let (mut tx_l, mut tx_r, mut hash_join) =
1304 create_executor::<{ AsOfJoinType::LeftOuter }>(asof_desc, use_cache).await;
1305
1306 tx_l.push_barrier(test_epoch(1), false);
1308 tx_r.push_barrier(test_epoch(1), false);
1309 hash_join.next_unwrap_ready_barrier()?;
1310
1311 tx_l.push_chunk(chunk_l1);
1313 let chunk = hash_join.next_unwrap_ready_chunk()?;
1314 assert_eq!(
1315 chunk,
1316 StreamChunk::from_pretty(
1317 " I I I I I I
1318 + 1 4 7 . . .
1319 + 2 5 8 . . .
1320 + 3 6 9 . . ."
1321 )
1322 );
1323
1324 tx_l.push_barrier(test_epoch(2), false);
1326 tx_r.push_barrier(test_epoch(2), false);
1327 hash_join.next_unwrap_ready_barrier()?;
1328
1329 tx_l.push_chunk(chunk_l2);
1331 let chunk = hash_join.next_unwrap_ready_chunk()?;
1332 assert_eq!(
1333 chunk,
1334 StreamChunk::from_pretty(
1335 " I I I I I I
1336 + 3 8 1 . . . D
1337 - 3 8 1 . . . D"
1338 )
1339 );
1340
1341 tx_r.push_chunk(chunk_r1);
1343 let chunk = hash_join.next_unwrap_ready_chunk()?;
1344 assert_eq!(
1345 chunk,
1346 StreamChunk::from_pretty(
1347 " I I I I I I
1348 - 2 5 8 . . .
1349 + 2 5 8 2 3 4
1350 - 2 5 8 2 3 4
1351 + 2 5 8 2 2 5
1352 - 2 5 8 2 2 5
1353 + 2 5 8 2 1 5"
1354 )
1355 );
1356
1357 tx_r.push_chunk(chunk_r2);
1359 let chunk = hash_join.next_unwrap_ready_chunk()?;
1360 assert_eq!(
1361 chunk,
1362 StreamChunk::from_pretty(
1363 " I I I I I I
1364 - 2 5 8 2 1 5
1365 + 2 5 8 2 2 5
1366 - 2 5 8 2 2 5
1367 + 2 5 8 . . ."
1368 )
1369 );
1370
1371 tx_l.push_chunk(chunk_l3);
1373 let chunk = hash_join.next_unwrap_ready_chunk()?;
1374 assert_eq!(
1375 chunk,
1376 StreamChunk::from_pretty(
1377 " I I I I I I
1378 + 6 8 9 6 1 8"
1379 )
1380 );
1381
1382 tx_r.push_chunk(chunk_r3);
1384 let chunk = hash_join.next_unwrap_ready_chunk()?;
1385 assert_eq!(
1386 chunk,
1387 StreamChunk::from_pretty(
1388 " I I I I I I
1389 - 6 8 9 6 1 8
1390 + 6 8 9 . . ."
1391 )
1392 );
1393 Ok(())
1394 }
1395}