1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::ops::Bound;
18
19use anyhow::Context;
20use either::Either;
21use futures::TryStreamExt;
22use futures::stream::{self, PollNext};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{HashKey, NullBitmap};
27use risingwave_common::row::RowExt;
28use risingwave_common::util::iter_util::ZipEqDebug;
29use risingwave_common_estimate_size::{EstimateSize, KvSize};
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_storage::row_serde::value_serde::ValueRowSerde;
32use risingwave_storage::store::PrefetchOptions;
33
34use super::join::{JoinType, JoinTypePrimitive};
35use super::monitor::TemporalJoinMetrics;
36use crate::cache::ManagedLruCache;
37use crate::common::metrics::MetricsInfo;
38use crate::common::table::state_table::ReplicatedStateTable;
39use crate::executor::join::builder::JoinStreamChunkBuilder;
40use crate::executor::prelude::*;
41
42pub struct TemporalJoinExecutor<
43 K: HashKey,
44 S: StateStore,
45 SD: ValueRowSerde,
46 const T: JoinTypePrimitive,
47 const APPEND_ONLY: bool,
48> {
49 ctx: ActorContextRef,
50 #[expect(dead_code)]
51 info: ExecutorInfo,
52 left: Executor,
53 right: Executor,
54 right_table: TemporalSide<K, S, SD>,
55 left_join_keys: Vec<usize>,
56 right_join_keys: Vec<usize>,
57 null_safe: Vec<bool>,
58 condition: Option<NonStrictExpression>,
59 output_indices: Vec<usize>,
60 chunk_size: usize,
61 memo_table: Option<StateTable<S>>,
62 metrics: TemporalJoinMetrics,
63}
64
65#[derive(Default)]
66pub struct JoinEntry {
67 cached: HashMap<OwnedRow, OwnedRow>,
69 kv_heap_size: KvSize,
70}
71
72impl EstimateSize for JoinEntry {
73 fn estimated_heap_size(&self) -> usize {
74 self.kv_heap_size.size()
77 }
78}
79
80impl JoinEntry {
81 pub fn insert(&mut self, key: OwnedRow, value: OwnedRow) {
83 if let Entry::Vacant(e) = self.cached.entry(key) {
86 self.kv_heap_size.add(e.key(), &value);
87 e.insert(value);
88 } else {
89 panic!("value {:?} double insert", value);
90 }
91 }
92
93 pub fn remove(&mut self, key: &OwnedRow) {
95 if let Some(value) = self.cached.remove(key) {
96 self.kv_heap_size.sub(key, &value);
97 } else {
98 panic!("key {:?} should be in the cache", key);
99 }
100 }
101
102 pub fn is_empty(&self) -> bool {
103 self.cached.is_empty()
104 }
105}
106
107struct TemporalSide<K: HashKey, S: StateStore, SD: ValueRowSerde> {
108 source: ReplicatedStateTable<S, SD>,
109 table_stream_key_indices: Vec<usize>,
110 cache: ManagedLruCache<K, JoinEntry>,
111 join_key_data_types: Vec<DataType>,
112}
113
114impl<K: HashKey, S: StateStore, SD: ValueRowSerde> TemporalSide<K, S, SD> {
115 async fn fetch_or_promote_keys(
118 &mut self,
119 keys: impl Iterator<Item = &K>,
120 metrics: &TemporalJoinMetrics,
121 ) -> StreamExecutorResult<()> {
122 let mut futs = Vec::with_capacity(keys.size_hint().1.unwrap_or(0));
123 for key in keys {
124 metrics.temporal_join_total_query_cache_count.inc();
125
126 if self.cache.get(key).is_none() {
127 metrics.temporal_join_cache_miss_count.inc();
128
129 futs.push(async {
130 let pk_prefix = key.deserialize(&self.join_key_data_types)?;
131
132 let iter = self
133 .source
134 .iter_with_prefix(
135 &pk_prefix,
136 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
137 PrefetchOptions::default(),
138 )
139 .await?;
140
141 let mut entry = JoinEntry::default();
142
143 pin_mut!(iter);
144 while let Some(row) = iter.next().await {
145 let row: OwnedRow = row?;
146 entry.insert(
147 row.as_ref()
148 .project(&self.table_stream_key_indices)
149 .into_owned_row(),
150 row,
151 );
152 }
153 let key = key.clone();
154 Ok((key, entry)) as StreamExecutorResult<_>
155 });
156 }
157 }
158
159 #[for_await]
160 for res in stream::iter(futs).buffered(16) {
161 let (key, entry) = res?;
162 self.cache.put(key, entry);
163 }
164
165 Ok(())
166 }
167
168 fn force_peek(&self, key: &K) -> &JoinEntry {
169 self.cache.peek(key).expect("key should exists")
170 }
171
172 fn update(
173 &mut self,
174 chunks: Vec<StreamChunk>,
175 join_keys: &[usize],
176 right_stream_key_indices: &[usize],
177 ) -> StreamExecutorResult<()> {
178 for chunk in chunks {
179 let keys = K::build_many(join_keys, chunk.data_chunk());
180 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
181 let Some((op, row)) = r else {
182 continue;
183 };
184 if self.cache.contains(&key) {
185 let mut entry = self.cache.get_mut(&key).unwrap();
187 let stream_key = row.project(right_stream_key_indices).into_owned_row();
188 match op {
189 Op::Insert | Op::UpdateInsert => {
190 entry.insert(stream_key, row.into_owned_row())
191 }
192 Op::Delete | Op::UpdateDelete => entry.remove(&stream_key),
193 };
194 }
195 }
196 self.source.write_chunk(chunk);
197 }
198 Ok(())
199 }
200}
201
202pub(super) enum InternalMessage {
203 Chunk(StreamChunk),
204 Barrier(Vec<StreamChunk>, Barrier),
205 WaterMark(Watermark),
206}
207
208#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
209async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
210 #[for_await]
211 for item in stream {
212 match item? {
213 Message::Watermark(_) => {
214 }
216 Message::Chunk(c) => yield c,
217 Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
218 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
219 }
220 Message::Barrier(_) => return Ok(()),
221 }
222 }
223}
224
225#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
226async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
227 #[for_await]
228 for item in stream {
229 match item? {
230 Message::Watermark(w) => {
231 yield InternalMessage::WaterMark(w);
232 }
233 Message::Chunk(c) => yield InternalMessage::Chunk(c),
234 Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
235 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
236 }
237 Message::Barrier(_) => return Ok(()),
238 }
239 }
240}
241
242pub(super) async fn expect_first_barrier(
243 stream: &mut (impl Stream<Item = StreamExecutorResult<InternalMessage>> + Unpin),
244) -> StreamExecutorResult<Barrier> {
245 let InternalMessage::Barrier(updates, barrier) = stream
246 .try_next()
247 .instrument_await("expect_first_barrier")
248 .await?
249 .context("failed to extract the first message: stream closed unexpectedly")?
250 else {
251 unreachable!("unexpected internal message");
252 };
253 assert!(updates.is_empty());
254 Ok(barrier)
255}
256
257#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
262pub(super) async fn align_input<const YIELD_RIGHT_CHUNKS: bool>(left: Executor, right: Executor) {
263 let mut left = pin!(left.execute());
264 let mut right = pin!(right.execute());
265 loop {
267 let mut right_chunks = vec![];
268 'inner: loop {
270 let mut combined = stream::select_with_strategy(
271 left.by_ref().map(Either::Left),
272 right.by_ref().map(Either::Right),
273 |_: &mut ()| PollNext::Left,
274 );
275 match combined.next().await {
276 Some(Either::Left(Ok(Message::Chunk(c)))) => yield InternalMessage::Chunk(c),
277 Some(Either::Right(Ok(Message::Chunk(c)))) => {
278 if YIELD_RIGHT_CHUNKS {
279 right_chunks.push(c);
280 }
281 }
282 Some(Either::Left(Ok(Message::Barrier(b)))) => {
283 let mut remain = chunks_until_barrier(right.by_ref(), b.clone())
284 .try_collect()
285 .await?;
286 if YIELD_RIGHT_CHUNKS {
287 right_chunks.append(&mut remain);
288 }
289 yield InternalMessage::Barrier(right_chunks, b);
290 break 'inner;
291 }
292 Some(Either::Right(Ok(Message::Barrier(b)))) => {
293 #[for_await]
294 for internal_message in
295 internal_messages_until_barrier(left.by_ref(), b.clone())
296 {
297 yield internal_message?;
298 }
299 yield InternalMessage::Barrier(right_chunks, b);
300 break 'inner;
301 }
302 Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e),
303 Some(Either::Left(Ok(Message::Watermark(w)))) => {
304 yield InternalMessage::WaterMark(w);
305 }
306 Some(Either::Right(Ok(Message::Watermark(_)))) => {
307 }
309 None => return Ok(()),
310 }
311 }
312 }
313}
314
315pub(super) fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk {
316 let (data_chunk, ops) = chunk.into_parts();
317 let (columns, vis) = data_chunk.into_parts();
318 let output_columns = indices
319 .iter()
320 .cloned()
321 .map(|idx| columns[idx].clone())
322 .collect();
323 StreamChunk::with_visibility(ops, output_columns, vis)
324}
325
326pub(super) mod phase1 {
327 use std::ops::Bound;
328
329 use futures::{StreamExt, pin_mut};
330 use futures_async_stream::try_stream;
331 use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
332 use risingwave_common::array::{Op, StreamChunk};
333 use risingwave_common::hash::{HashKey, NullBitmap};
334 use risingwave_common::row::{self, OwnedRow, Row, RowExt};
335 use risingwave_common::types::{DataType, DatumRef};
336 use risingwave_common::util::iter_util::ZipEqDebug;
337 use risingwave_storage::StateStore;
338 use risingwave_storage::row_serde::value_serde::ValueRowSerde;
339
340 use super::{StreamExecutorError, TemporalSide};
341 use crate::common::table::state_table::StateTable;
342 use crate::executor::monitor::TemporalJoinMetrics;
343
344 pub trait Phase1Evaluation {
345 #[must_use = "consume chunk if produced"]
347 fn append_matched_row(
348 op: Op,
349 builder: &mut StreamChunkBuilder,
350 left_row: impl Row,
351 right_row: impl Row,
352 ) -> Option<StreamChunk>;
353
354 #[must_use = "consume chunk if produced"]
356 fn match_end(
357 builder: &mut StreamChunkBuilder,
358 op: Op,
359 left_row: impl Row,
360 right_size: usize,
361 matched: bool,
362 ) -> Option<StreamChunk>;
363 }
364
365 pub struct Inner;
366 pub struct LeftOuter;
367 pub struct LeftOuterWithCond;
368
369 impl Phase1Evaluation for Inner {
370 fn append_matched_row(
371 op: Op,
372 builder: &mut StreamChunkBuilder,
373 left_row: impl Row,
374 right_row: impl Row,
375 ) -> Option<StreamChunk> {
376 builder.append_row(op, left_row.chain(right_row))
377 }
378
379 fn match_end(
380 _builder: &mut StreamChunkBuilder,
381 _op: Op,
382 _left_row: impl Row,
383 _right_size: usize,
384 _matched: bool,
385 ) -> Option<StreamChunk> {
386 None
387 }
388 }
389
390 impl Phase1Evaluation for LeftOuter {
391 fn append_matched_row(
392 op: Op,
393 builder: &mut StreamChunkBuilder,
394 left_row: impl Row,
395 right_row: impl Row,
396 ) -> Option<StreamChunk> {
397 builder.append_row(op, left_row.chain(right_row))
398 }
399
400 fn match_end(
401 builder: &mut StreamChunkBuilder,
402 op: Op,
403 left_row: impl Row,
404 right_size: usize,
405 matched: bool,
406 ) -> Option<StreamChunk> {
407 if !matched {
408 builder.append_row(
410 op,
411 left_row.chain(row::repeat_n(DatumRef::None, right_size)),
412 )
413 } else {
414 None
415 }
416 }
417 }
418
419 impl Phase1Evaluation for LeftOuterWithCond {
420 fn append_matched_row(
421 op: Op,
422 builder: &mut StreamChunkBuilder,
423 left_row: impl Row,
424 right_row: impl Row,
425 ) -> Option<StreamChunk> {
426 builder.append_row(op, left_row.chain(right_row))
427 }
428
429 fn match_end(
430 builder: &mut StreamChunkBuilder,
431 op: Op,
432 left_row: impl Row,
433 right_size: usize,
434 _matched: bool,
435 ) -> Option<StreamChunk> {
436 builder.append_row_invisible(
439 op,
440 left_row.chain(row::repeat_n(DatumRef::None, right_size)),
441 )
442 }
443 }
444
445 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
446 #[expect(clippy::too_many_arguments)]
447 pub(super) async fn handle_chunk<
448 'a,
449 K: HashKey,
450 S: StateStore,
451 SD: ValueRowSerde,
452 E: Phase1Evaluation,
453 const APPEND_ONLY: bool,
454 >(
455 chunk_size: usize,
456 right_size: usize,
457 full_schema: Vec<DataType>,
458 left_join_keys: &'a [usize],
459 right_table: &'a mut TemporalSide<K, S, SD>,
460 memo_table_lookup_prefix: &'a [usize],
461 memo_table: &'a mut Option<StateTable<S>>,
462 null_matched: &'a K::Bitmap,
463 chunk: StreamChunk,
464 metrics: &'a TemporalJoinMetrics,
465 ) {
466 let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
467 let keys = K::build_many(left_join_keys, chunk.data_chunk());
468 let to_fetch_keys = chunk
469 .visibility()
470 .iter()
471 .zip_eq_debug(keys.iter())
472 .zip_eq_debug(chunk.ops())
473 .filter_map(|((vis, key), op)| {
474 if vis {
475 if APPEND_ONLY {
476 assert_eq!(*op, Op::Insert);
477 Some(key)
478 } else {
479 match op {
480 Op::Insert | Op::UpdateInsert => Some(key),
481 Op::Delete | Op::UpdateDelete => None,
482 }
483 }
484 } else {
485 None
486 }
487 });
488 right_table
489 .fetch_or_promote_keys(to_fetch_keys, metrics)
490 .await?;
491
492 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
493 let Some((op, left_row)) = r else {
494 continue;
495 };
496
497 let mut matched = false;
498
499 if APPEND_ONLY {
500 if key.null_bitmap().is_subset(null_matched)
502 && let join_entry = right_table.force_peek(&key)
503 && !join_entry.is_empty()
504 {
505 matched = true;
506 for right_row in join_entry.cached.values() {
507 if let Some(chunk) =
508 E::append_matched_row(op, &mut builder, left_row, right_row)
509 {
510 yield chunk;
511 }
512 }
513 }
514 } else {
515 let memo_table = memo_table.as_mut().unwrap();
529 match op {
530 Op::Insert | Op::UpdateInsert => {
531 if key.null_bitmap().is_subset(null_matched)
532 && let join_entry = right_table.force_peek(&key)
533 && !join_entry.is_empty()
534 {
535 matched = true;
536 for right_row in join_entry.cached.values() {
537 let right_row: OwnedRow = right_row.clone();
538 memo_table.insert(right_row.clone().chain(
540 left_row.project(memo_table_lookup_prefix).into_owned_row(),
541 ));
542 if let Some(chunk) = E::append_matched_row(
543 Op::Insert,
544 &mut builder,
545 left_row,
546 right_row,
547 ) {
548 yield chunk;
549 }
550 }
551 }
552 }
553 Op::Delete | Op::UpdateDelete => {
554 let mut memo_rows_to_delete = vec![];
555 if key.null_bitmap().is_subset(null_matched) {
556 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
557 &(Bound::Unbounded, Bound::Unbounded);
558 let prefix = left_row.project(memo_table_lookup_prefix);
559 let state_table_iter = memo_table
560 .iter_with_prefix(prefix, sub_range, Default::default())
561 .await?;
562 pin_mut!(state_table_iter);
563
564 while let Some(memo_row) = state_table_iter.next().await {
565 matched = true;
566 let memo_row = memo_row?.into_owned_row();
567 memo_rows_to_delete.push(memo_row.clone());
568 if let Some(chunk) = E::append_matched_row(
569 Op::Delete,
570 &mut builder,
571 left_row,
572 memo_row.slice(0..right_size),
573 ) {
574 yield chunk;
575 }
576 }
577 }
578 for memo_row in memo_rows_to_delete {
579 memo_table.delete(memo_row);
581 }
582 }
583 }
584 }
585 if let Some(chunk) = E::match_end(
586 &mut builder,
587 match op {
588 Op::Insert | Op::UpdateInsert => Op::Insert,
589 Op::Delete | Op::UpdateDelete => Op::Delete,
590 },
591 left_row,
592 right_size,
593 matched,
594 ) {
595 yield chunk;
596 }
597 }
598
599 if let Some(chunk) = builder.take() {
600 yield chunk;
601 }
602 }
603}
604
605impl<
606 K: HashKey,
607 S: StateStore,
608 SD: ValueRowSerde,
609 const T: JoinTypePrimitive,
610 const APPEND_ONLY: bool,
611> TemporalJoinExecutor<K, S, SD, T, APPEND_ONLY>
612{
613 #[expect(clippy::too_many_arguments)]
614 pub fn new(
615 ctx: ActorContextRef,
616 info: ExecutorInfo,
617 left: Executor,
618 right: Executor,
619 table: ReplicatedStateTable<S, SD>,
620 left_join_keys: Vec<usize>,
621 right_join_keys: Vec<usize>,
622 null_safe: Vec<bool>,
623 condition: Option<NonStrictExpression>,
624 output_indices: Vec<usize>,
625 table_stream_key_indices: Vec<usize>,
626 watermark_sequence: AtomicU64Ref,
627 metrics: Arc<StreamingMetrics>,
628 chunk_size: usize,
629 join_key_data_types: Vec<DataType>,
630 memo_table: Option<StateTable<S>>,
631 ) -> Self {
632 let metrics_info =
633 MetricsInfo::new(metrics.clone(), table.table_id(), ctx.id, "temporal join");
634
635 let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info);
636
637 let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);
638
639 Self {
640 ctx,
641 info,
642 left,
643 right,
644 right_table: TemporalSide {
645 source: table,
646 table_stream_key_indices,
647 cache,
648 join_key_data_types,
649 },
650 left_join_keys,
651 right_join_keys,
652 null_safe,
653 condition,
654 output_indices,
655 chunk_size,
656 memo_table,
657 metrics,
658 }
659 }
660
661 #[try_stream(ok = Message, error = StreamExecutorError)]
662 async fn into_stream(mut self) {
663 let right_size = self.right.schema().len();
664
665 let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
666 &self.output_indices,
667 self.left.schema().len(),
668 right_size,
669 );
670
671 let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
672
673 let left_stream_key_indices = self.left.stream_key().to_vec();
674 let right_stream_key_indices = self.right.stream_key().to_vec();
675 let memo_table_lookup_prefix = self
676 .left_join_keys
677 .iter()
678 .cloned()
679 .chain(left_stream_key_indices)
680 .collect_vec();
681
682 let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
683
684 let full_schema: Vec<_> = self
685 .left
686 .schema()
687 .data_types()
688 .into_iter()
689 .chain(self.right.schema().data_types().into_iter())
690 .collect();
691
692 let input = align_input::<true>(self.left, self.right);
693 pin_mut!(input);
694 let barrier = expect_first_barrier(&mut input).await?;
695 let barrier_epoch = barrier.epoch;
696 yield Message::Barrier(barrier);
697 self.right_table.source.init_epoch(barrier_epoch).await?;
698 if !APPEND_ONLY {
699 self.memo_table
700 .as_mut()
701 .unwrap()
702 .init_epoch(barrier_epoch)
703 .await?;
704 }
705
706 #[for_await]
707 for msg in input {
708 self.right_table.cache.evict();
709 self.metrics
710 .temporal_join_cached_entry_count
711 .set(self.right_table.cache.len() as i64);
712 match msg? {
713 InternalMessage::WaterMark(watermark) => {
714 let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
715 yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
716 }
717 InternalMessage::Chunk(chunk) => {
718 let full_schema = full_schema.clone();
719
720 if T == JoinType::Inner {
721 let st1 = phase1::handle_chunk::<K, S, SD, phase1::Inner, APPEND_ONLY>(
722 self.chunk_size,
723 right_size,
724 full_schema,
725 &self.left_join_keys,
726 &mut self.right_table,
727 &memo_table_lookup_prefix,
728 &mut self.memo_table,
729 &null_matched,
730 chunk,
731 &self.metrics,
732 );
733 #[for_await]
734 for chunk in st1 {
735 let chunk = chunk?;
736 let new_chunk = if let Some(ref cond) = self.condition {
737 let (data_chunk, ops) = chunk.into_parts();
738 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
739 let passed_bitmap =
740 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
741 let (columns, vis) = data_chunk.into_parts();
742 let new_vis = vis & passed_bitmap;
743 StreamChunk::with_visibility(ops, columns, new_vis)
744 } else {
745 chunk
746 };
747 let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
748 yield Message::Chunk(new_chunk);
749 }
750 } else if let Some(ref cond) = self.condition {
751 let st1 = phase1::handle_chunk::<
753 K,
754 S,
755 SD,
756 phase1::LeftOuterWithCond,
757 APPEND_ONLY,
758 >(
759 self.chunk_size,
760 right_size,
761 full_schema,
762 &self.left_join_keys,
763 &mut self.right_table,
764 &memo_table_lookup_prefix,
765 &mut self.memo_table,
766 &null_matched,
767 chunk,
768 &self.metrics,
769 );
770 let mut matched_count = 0usize;
771 #[for_await]
772 for chunk in st1 {
773 let chunk = chunk?;
774 let (data_chunk, ops) = chunk.into_parts();
775 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
776 let passed_bitmap =
777 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
778 let (columns, vis) = data_chunk.into_parts();
779 let mut new_vis = BitmapBuilder::with_capacity(vis.len());
780 for (passed, not_match_end) in
781 passed_bitmap.iter().zip_eq_debug(vis.iter())
782 {
783 let is_match_end = !not_match_end;
784 let vis = if is_match_end && matched_count == 0 {
785 true
787 } else if is_match_end {
788 matched_count = 0;
790 false
792 } else {
793 if passed {
794 matched_count += 1;
795 }
796 passed
797 };
798 new_vis.append(vis);
799 }
800 let new_chunk = apply_indices_map(
801 StreamChunk::with_visibility(ops, columns, new_vis.finish()),
802 &self.output_indices,
803 );
804 yield Message::Chunk(new_chunk);
805 }
806 assert_eq!(matched_count, 0);
808 } else {
809 let st1 = phase1::handle_chunk::<K, S, SD, phase1::LeftOuter, APPEND_ONLY>(
810 self.chunk_size,
811 right_size,
812 full_schema,
813 &self.left_join_keys,
814 &mut self.right_table,
815 &memo_table_lookup_prefix,
816 &mut self.memo_table,
817 &null_matched,
818 chunk,
819 &self.metrics,
820 );
821 #[for_await]
822 for chunk in st1 {
823 let chunk = chunk?;
824 let new_chunk = apply_indices_map(chunk, &self.output_indices);
825 yield Message::Chunk(new_chunk);
826 }
827 }
828 }
829 InternalMessage::Barrier(updates, barrier) => {
830 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
831
832 self.right_table.update(
835 updates,
836 &self.right_join_keys,
837 &right_stream_key_indices,
838 )?;
839 let right_post_commit = self.right_table.source.commit(barrier.epoch).await?;
840 let memo_post_commit = if !APPEND_ONLY {
841 Some(
842 self.memo_table
843 .as_mut()
844 .unwrap()
845 .commit(barrier.epoch)
846 .await?,
847 )
848 } else {
849 None
850 };
851
852 yield Message::Barrier(barrier);
853
854 if let Some((_, true)) = right_post_commit
855 .post_yield_barrier(update_vnode_bitmap.clone())
856 .await?
857 {
858 self.right_table.cache.clear();
859 }
860 if let Some(memo_post_commit) = memo_post_commit {
861 memo_post_commit
862 .post_yield_barrier(update_vnode_bitmap.clone())
863 .await?;
864 }
865 }
866 }
867 }
868 }
869}
870
871impl<
872 K: HashKey,
873 S: StateStore,
874 SD: ValueRowSerde,
875 const T: JoinTypePrimitive,
876 const APPEND_ONLY: bool,
877> Execute for TemporalJoinExecutor<K, S, SD, T, APPEND_ONLY>
878{
879 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
880 self.into_stream().boxed()
881 }
882}
883
884#[cfg(test)]
885mod tests {
886 use std::collections::HashSet;
887 use std::sync::Arc;
888 use std::sync::atomic::AtomicU64;
889
890 use risingwave_common::array::*;
891 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
892 use risingwave_common::hash::Key32;
893 use risingwave_common::types::{DataType, ScalarRefImpl};
894 use risingwave_common::util::epoch::{EpochPair, test_epoch};
895 use risingwave_common::util::sort_util::OrderType;
896 use risingwave_common::util::value_encoding::BasicSerde;
897 use risingwave_hummock_test::test_utils::prepare_hummock_test_env;
898 use risingwave_storage::hummock::HummockStorage;
899
900 use super::*;
901 use crate::common::table::state_table::{
902 StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
903 };
904 use crate::common::table::test_utils::gen_pbtable;
905 use crate::executor::monitor::StreamingMetrics;
906 use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
907 use crate::executor::{ActorContext, ExecutorInfo, JoinType};
908
909 #[tokio::test]
926 async fn test_temporal_join_pk_prefix_staging_merge() {
927 let test_env = prepare_hummock_test_env().await;
928 let table_id = TableId::new(1);
929
930 let right_col_descs = vec![
934 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int32),
935 ColumnDesc::unnamed(ColumnId::new(2), DataType::Int32),
936 ColumnDesc::unnamed(ColumnId::new(3), DataType::Int32),
937 ];
938 let order_types = vec![OrderType::ascending(), OrderType::ascending()];
939 let pk_indices = vec![0usize, 1];
940 let pbtable = gen_pbtable(table_id, right_col_descs, order_types, pk_indices, 2);
941
942 test_env.register_table(pbtable.clone()).await;
943
944 {
946 let mut setup_table = StateTable::<HummockStorage>::from_table_catalog_inconsistent_op(
947 &pbtable,
948 test_env.storage.clone(),
949 None,
950 )
951 .await;
952 test_env
953 .storage
954 .start_epoch(test_epoch(1), HashSet::from_iter([table_id]));
955 setup_table
956 .init_epoch(EpochPair::new_test_epoch(test_epoch(1)))
957 .await
958 .unwrap();
959 setup_table.insert(OwnedRow::new(vec![
960 Some(1i32.into()),
961 Some(1i32.into()),
962 Some(100i32.into()),
963 ]));
964 setup_table.insert(OwnedRow::new(vec![
965 Some(1i32.into()),
966 Some(2i32.into()),
967 Some(200i32.into()),
968 ]));
969 setup_table.insert(OwnedRow::new(vec![
970 Some(2i32.into()),
971 Some(1i32.into()),
972 Some(300i32.into()),
973 ]));
974 test_env
975 .storage
976 .start_epoch(test_epoch(2), HashSet::from_iter([table_id]));
977 setup_table
978 .commit_for_test(EpochPair::new_test_epoch(test_epoch(2)))
979 .await
980 .unwrap();
981 test_env.commit_epoch(test_epoch(1)).await;
983 }
984
985 let output_column_ids = vec![ColumnId::new(1), ColumnId::new(2), ColumnId::new(3)];
987 let right_table = StateTableBuilder::<_, BasicSerde, true, _>::new(
988 &pbtable,
989 test_env.storage.clone(),
990 None,
991 )
992 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
993 .with_output_column_ids(output_column_ids)
994 .forbid_preload_all_rows()
995 .build()
996 .await;
997
998 let left_schema = Schema::new(vec![
1000 Field::unnamed(DataType::Int32),
1001 Field::unnamed(DataType::Int32),
1002 ]);
1003 let (mut left_tx, left_source) = MockSource::channel();
1004 let left_executor = left_source.into_executor(left_schema.clone(), vec![0]);
1005
1006 let right_schema = Schema::new(vec![
1009 Field::unnamed(DataType::Int32),
1010 Field::unnamed(DataType::Int32),
1011 Field::unnamed(DataType::Int32),
1012 ]);
1013 let (mut right_tx, right_source) = MockSource::channel();
1014 let right_executor = right_source.into_executor(right_schema.clone(), vec![0, 1]);
1015
1016 let table_stream_key_indices = vec![0usize, 1];
1018
1019 let left_join_keys = vec![0usize];
1021 let right_join_keys = vec![0usize];
1022 let null_safe = vec![false];
1023 let join_key_data_types = vec![DataType::Int32];
1024
1025 let output_indices = vec![0usize, 1, 2, 3, 4];
1027 let output_schema = Schema::new(vec![
1028 Field::unnamed(DataType::Int32),
1029 Field::unnamed(DataType::Int32),
1030 Field::unnamed(DataType::Int32),
1031 Field::unnamed(DataType::Int32),
1032 Field::unnamed(DataType::Int32),
1033 ]);
1034 let info = ExecutorInfo::for_test(output_schema, vec![], "TemporalJoinTest".to_owned(), 0);
1035
1036 let executor = TemporalJoinExecutor::<
1037 Key32,
1038 HummockStorage,
1039 BasicSerde,
1040 { JoinType::Inner },
1041 true,
1042 >::new(
1043 ActorContext::for_test(0),
1044 info.clone(),
1045 left_executor,
1046 right_executor,
1047 right_table,
1048 left_join_keys,
1049 right_join_keys,
1050 null_safe,
1051 None, output_indices,
1053 table_stream_key_indices,
1054 Arc::new(AtomicU64::new(0)),
1055 Arc::new(StreamingMetrics::unused()),
1056 1024,
1057 join_key_data_types,
1058 None, );
1060
1061 let mut stream = Box::new(executor).execute();
1062
1063 left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(2), test_epoch(1), false);
1065 right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(2), test_epoch(1), false);
1066 stream.expect_barrier().await;
1067
1068 right_tx.push_chunk(StreamChunk::from_pretty(
1072 " i i i
1073 + 3 1 400",
1074 ));
1075 test_env
1077 .storage
1078 .start_epoch(test_epoch(3), HashSet::from_iter([table_id]));
1079 left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(3), test_epoch(2), false);
1080 right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(3), test_epoch(2), false);
1081 stream.expect_barrier().await;
1082
1083 test_env
1085 .storage
1086 .start_epoch(test_epoch(4), HashSet::from_iter([table_id]));
1087
1088 left_tx.push_chunk(StreamChunk::from_pretty(
1092 " i i
1093 + 1 111
1094 + 3 333",
1095 ));
1096 left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(4), test_epoch(3), true);
1097 right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(4), test_epoch(3), true);
1098
1099 let mut output_rows: Vec<[i32; 5]> = vec![];
1101 loop {
1102 match stream.next().await.unwrap().unwrap() {
1103 Message::Chunk(chunk) => {
1104 for (op, row) in chunk.rows() {
1105 assert_eq!(op, Op::Insert);
1106 let row: [i32; 5] =
1107 std::array::from_fn(|i| match row.datum_at(i).unwrap() {
1108 ScalarRefImpl::Int32(v) => v,
1109 _ => panic!("expected Int32"),
1110 });
1111 output_rows.push(row);
1112 }
1113 }
1114 Message::Barrier(_) => break,
1115 _ => {}
1116 }
1117 }
1118
1119 output_rows.sort();
1120 assert_eq!(
1121 output_rows,
1122 vec![
1123 [1, 111, 1, 1, 100], [1, 111, 1, 2, 200], [3, 333, 3, 1, 400], ]
1127 );
1128 }
1129}