1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::ops::Bound;
18
19use anyhow::Context;
20use either::Either;
21use futures::TryStreamExt;
22use futures::stream::{self, PollNext};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{HashKey, NullBitmap};
27use risingwave_common::row::RowExt;
28use risingwave_common::util::iter_util::ZipEqDebug;
29use risingwave_common_estimate_size::{EstimateSize, KvSize};
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_storage::row_serde::value_serde::ValueRowSerde;
32use risingwave_storage::store::PrefetchOptions;
33
34use super::join::{JoinType, JoinTypePrimitive};
35use super::monitor::TemporalJoinMetrics;
36use crate::cache::ManagedLruCache;
37use crate::common::metrics::MetricsInfo;
38use crate::common::table::state_table::ReplicatedStateTable;
39use crate::executor::join::builder::JoinStreamChunkBuilder;
40use crate::executor::prelude::*;
41
42pub struct TemporalJoinExecutor<
43 K: HashKey,
44 S: StateStore,
45 SD: ValueRowSerde,
46 const T: JoinTypePrimitive,
47 const APPEND_ONLY: bool,
48> {
49 ctx: ActorContextRef,
50 #[expect(dead_code)]
51 info: ExecutorInfo,
52 left: Executor,
53 right: Executor,
54 right_table: TemporalSide<K, S, SD>,
55 left_join_keys: Vec<usize>,
56 right_join_keys: Vec<usize>,
57 null_safe: Vec<bool>,
58 condition: Option<NonStrictExpression>,
59 output_indices: Vec<usize>,
60 chunk_size: usize,
61 memo_table: Option<StateTable<S>>,
62 metrics: TemporalJoinMetrics,
63}
64
65#[derive(Default)]
66pub struct JoinEntry {
67 cached: HashMap<OwnedRow, OwnedRow>,
69 kv_heap_size: KvSize,
70}
71
72impl EstimateSize for JoinEntry {
73 fn estimated_heap_size(&self) -> usize {
74 self.kv_heap_size.size()
77 }
78}
79
80impl JoinEntry {
81 pub fn insert(&mut self, key: OwnedRow, value: OwnedRow) {
83 if let Entry::Vacant(e) = self.cached.entry(key) {
86 self.kv_heap_size.add(e.key(), &value);
87 e.insert(value);
88 } else {
89 panic!("value {:?} double insert", value);
90 }
91 }
92
93 pub fn remove(&mut self, key: &OwnedRow) {
95 if let Some(value) = self.cached.remove(key) {
96 self.kv_heap_size.sub(key, &value);
97 } else {
98 panic!("key {:?} should be in the cache", key);
99 }
100 }
101
102 pub fn is_empty(&self) -> bool {
103 self.cached.is_empty()
104 }
105}
106
107struct TemporalSide<K: HashKey, S: StateStore, SD: ValueRowSerde> {
108 source: ReplicatedStateTable<S, SD>,
109 table_stream_key_indices: Vec<usize>,
110 table_output_indices: Vec<usize>,
111 cache: ManagedLruCache<K, JoinEntry>,
112 join_key_data_types: Vec<DataType>,
113}
114
115impl<K: HashKey, S: StateStore, SD: ValueRowSerde> TemporalSide<K, S, SD> {
116 async fn fetch_or_promote_keys(
119 &mut self,
120 keys: impl Iterator<Item = &K>,
121 metrics: &TemporalJoinMetrics,
122 ) -> StreamExecutorResult<()> {
123 let mut futs = Vec::with_capacity(keys.size_hint().1.unwrap_or(0));
124 for key in keys {
125 metrics.temporal_join_total_query_cache_count.inc();
126
127 if self.cache.get(key).is_none() {
128 metrics.temporal_join_cache_miss_count.inc();
129
130 futs.push(async {
131 let pk_prefix = key.deserialize(&self.join_key_data_types)?;
132
133 let iter = self
134 .source
135 .iter_with_prefix(
136 &pk_prefix,
137 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
138 PrefetchOptions::default(),
139 )
140 .await?;
141
142 let mut entry = JoinEntry::default();
143
144 pin_mut!(iter);
145 while let Some(row) = iter.next().await {
146 let row: OwnedRow = row?;
147 entry.insert(
148 row.as_ref()
149 .project(&self.table_stream_key_indices)
150 .into_owned_row(),
151 row.project(&self.table_output_indices).into_owned_row(),
152 );
153 }
154 let key = key.clone();
155 Ok((key, entry)) as StreamExecutorResult<_>
156 });
157 }
158 }
159
160 #[for_await]
161 for res in stream::iter(futs).buffered(16) {
162 let (key, entry) = res?;
163 self.cache.put(key, entry);
164 }
165
166 Ok(())
167 }
168
169 fn force_peek(&self, key: &K) -> &JoinEntry {
170 self.cache.peek(key).expect("key should exists")
171 }
172
173 fn update(
174 &mut self,
175 chunks: Vec<StreamChunk>,
176 join_keys: &[usize],
177 right_stream_key_indices: &[usize],
178 ) -> StreamExecutorResult<()> {
179 for chunk in chunks {
180 let keys = K::build_many(join_keys, chunk.data_chunk());
181 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
182 let Some((op, row)) = r else {
183 continue;
184 };
185 if self.cache.contains(&key) {
186 let mut entry = self.cache.get_mut(&key).unwrap();
188 let stream_key = row.project(right_stream_key_indices).into_owned_row();
189 match op {
190 Op::Insert | Op::UpdateInsert => {
191 entry.insert(stream_key, row.into_owned_row())
192 }
193 Op::Delete | Op::UpdateDelete => entry.remove(&stream_key),
194 };
195 }
196 }
197 self.source.write_chunk(chunk);
198 }
199 Ok(())
200 }
201}
202
203pub(super) enum InternalMessage {
204 Chunk(StreamChunk),
205 Barrier(Vec<StreamChunk>, Barrier),
206 WaterMark(Watermark),
207}
208
209#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
210async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
211 #[for_await]
212 for item in stream {
213 match item? {
214 Message::Watermark(_) => {
215 }
217 Message::Chunk(c) => yield c,
218 Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
219 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
220 }
221 Message::Barrier(_) => return Ok(()),
222 }
223 }
224}
225
226#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
227async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
228 #[for_await]
229 for item in stream {
230 match item? {
231 Message::Watermark(w) => {
232 yield InternalMessage::WaterMark(w);
233 }
234 Message::Chunk(c) => yield InternalMessage::Chunk(c),
235 Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
236 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
237 }
238 Message::Barrier(_) => return Ok(()),
239 }
240 }
241}
242
243pub(super) async fn expect_first_barrier(
244 stream: &mut (impl Stream<Item = StreamExecutorResult<InternalMessage>> + Unpin),
245) -> StreamExecutorResult<Barrier> {
246 let InternalMessage::Barrier(updates, barrier) = stream
247 .try_next()
248 .instrument_await("expect_first_barrier")
249 .await?
250 .context("failed to extract the first message: stream closed unexpectedly")?
251 else {
252 unreachable!("unexpected internal message");
253 };
254 assert!(updates.is_empty());
255 Ok(barrier)
256}
257
258#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
263pub(super) async fn align_input<const YIELD_RIGHT_CHUNKS: bool>(left: Executor, right: Executor) {
264 let mut left = pin!(left.execute());
265 let mut right = pin!(right.execute());
266 loop {
268 let mut right_chunks = vec![];
269 'inner: loop {
271 let mut combined = stream::select_with_strategy(
272 left.by_ref().map(Either::Left),
273 right.by_ref().map(Either::Right),
274 |_: &mut ()| PollNext::Left,
275 );
276 match combined.next().await {
277 Some(Either::Left(Ok(Message::Chunk(c)))) => yield InternalMessage::Chunk(c),
278 Some(Either::Right(Ok(Message::Chunk(c)))) => {
279 if YIELD_RIGHT_CHUNKS {
280 right_chunks.push(c);
281 }
282 }
283 Some(Either::Left(Ok(Message::Barrier(b)))) => {
284 let mut remain = chunks_until_barrier(right.by_ref(), b.clone())
285 .try_collect()
286 .await?;
287 if YIELD_RIGHT_CHUNKS {
288 right_chunks.append(&mut remain);
289 }
290 yield InternalMessage::Barrier(right_chunks, b);
291 break 'inner;
292 }
293 Some(Either::Right(Ok(Message::Barrier(b)))) => {
294 #[for_await]
295 for internal_message in
296 internal_messages_until_barrier(left.by_ref(), b.clone())
297 {
298 yield internal_message?;
299 }
300 yield InternalMessage::Barrier(right_chunks, b);
301 break 'inner;
302 }
303 Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e),
304 Some(Either::Left(Ok(Message::Watermark(w)))) => {
305 yield InternalMessage::WaterMark(w);
306 }
307 Some(Either::Right(Ok(Message::Watermark(_)))) => {
308 }
310 None => return Ok(()),
311 }
312 }
313 }
314}
315
316pub(super) fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk {
317 let (data_chunk, ops) = chunk.into_parts();
318 let (columns, vis) = data_chunk.into_parts();
319 let output_columns = indices
320 .iter()
321 .cloned()
322 .map(|idx| columns[idx].clone())
323 .collect();
324 StreamChunk::with_visibility(ops, output_columns, vis)
325}
326
327pub(super) mod phase1 {
328 use std::ops::Bound;
329
330 use futures::{StreamExt, pin_mut};
331 use futures_async_stream::try_stream;
332 use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
333 use risingwave_common::array::{Op, StreamChunk};
334 use risingwave_common::hash::{HashKey, NullBitmap};
335 use risingwave_common::row::{self, OwnedRow, Row, RowExt};
336 use risingwave_common::types::{DataType, DatumRef};
337 use risingwave_common::util::iter_util::ZipEqDebug;
338 use risingwave_storage::StateStore;
339 use risingwave_storage::row_serde::value_serde::ValueRowSerde;
340
341 use super::{StreamExecutorError, TemporalSide};
342 use crate::common::table::state_table::StateTable;
343 use crate::executor::monitor::TemporalJoinMetrics;
344
345 pub trait Phase1Evaluation {
346 #[must_use = "consume chunk if produced"]
348 fn append_matched_row(
349 op: Op,
350 builder: &mut StreamChunkBuilder,
351 left_row: impl Row,
352 right_row: impl Row,
353 ) -> Option<StreamChunk>;
354
355 #[must_use = "consume chunk if produced"]
357 fn match_end(
358 builder: &mut StreamChunkBuilder,
359 op: Op,
360 left_row: impl Row,
361 right_size: usize,
362 matched: bool,
363 ) -> Option<StreamChunk>;
364 }
365
366 pub struct Inner;
367 pub struct LeftOuter;
368 pub struct LeftOuterWithCond;
369
370 impl Phase1Evaluation for Inner {
371 fn append_matched_row(
372 op: Op,
373 builder: &mut StreamChunkBuilder,
374 left_row: impl Row,
375 right_row: impl Row,
376 ) -> Option<StreamChunk> {
377 builder.append_row(op, left_row.chain(right_row))
378 }
379
380 fn match_end(
381 _builder: &mut StreamChunkBuilder,
382 _op: Op,
383 _left_row: impl Row,
384 _right_size: usize,
385 _matched: bool,
386 ) -> Option<StreamChunk> {
387 None
388 }
389 }
390
391 impl Phase1Evaluation for LeftOuter {
392 fn append_matched_row(
393 op: Op,
394 builder: &mut StreamChunkBuilder,
395 left_row: impl Row,
396 right_row: impl Row,
397 ) -> Option<StreamChunk> {
398 builder.append_row(op, left_row.chain(right_row))
399 }
400
401 fn match_end(
402 builder: &mut StreamChunkBuilder,
403 op: Op,
404 left_row: impl Row,
405 right_size: usize,
406 matched: bool,
407 ) -> Option<StreamChunk> {
408 if !matched {
409 builder.append_row(
411 op,
412 left_row.chain(row::repeat_n(DatumRef::None, right_size)),
413 )
414 } else {
415 None
416 }
417 }
418 }
419
420 impl Phase1Evaluation for LeftOuterWithCond {
421 fn append_matched_row(
422 op: Op,
423 builder: &mut StreamChunkBuilder,
424 left_row: impl Row,
425 right_row: impl Row,
426 ) -> Option<StreamChunk> {
427 builder.append_row(op, left_row.chain(right_row))
428 }
429
430 fn match_end(
431 builder: &mut StreamChunkBuilder,
432 op: Op,
433 left_row: impl Row,
434 right_size: usize,
435 _matched: bool,
436 ) -> Option<StreamChunk> {
437 builder.append_row_invisible(
440 op,
441 left_row.chain(row::repeat_n(DatumRef::None, right_size)),
442 )
443 }
444 }
445
446 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
447 #[expect(clippy::too_many_arguments)]
448 pub(super) async fn handle_chunk<
449 'a,
450 K: HashKey,
451 S: StateStore,
452 SD: ValueRowSerde,
453 E: Phase1Evaluation,
454 const APPEND_ONLY: bool,
455 >(
456 chunk_size: usize,
457 right_size: usize,
458 full_schema: Vec<DataType>,
459 left_join_keys: &'a [usize],
460 right_table: &'a mut TemporalSide<K, S, SD>,
461 memo_table_lookup_prefix: &'a [usize],
462 memo_table: &'a mut Option<StateTable<S>>,
463 null_matched: &'a K::Bitmap,
464 chunk: StreamChunk,
465 metrics: &'a TemporalJoinMetrics,
466 ) {
467 let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
468 let keys = K::build_many(left_join_keys, chunk.data_chunk());
469 let to_fetch_keys = chunk
470 .visibility()
471 .iter()
472 .zip_eq_debug(keys.iter())
473 .zip_eq_debug(chunk.ops())
474 .filter_map(|((vis, key), op)| {
475 if vis {
476 if APPEND_ONLY {
477 assert_eq!(*op, Op::Insert);
478 Some(key)
479 } else {
480 match op {
481 Op::Insert | Op::UpdateInsert => Some(key),
482 Op::Delete | Op::UpdateDelete => None,
483 }
484 }
485 } else {
486 None
487 }
488 });
489 right_table
490 .fetch_or_promote_keys(to_fetch_keys, metrics)
491 .await?;
492
493 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
494 let Some((op, left_row)) = r else {
495 continue;
496 };
497
498 let mut matched = false;
499
500 if APPEND_ONLY {
501 if key.null_bitmap().is_subset(null_matched)
503 && let join_entry = right_table.force_peek(&key)
504 && !join_entry.is_empty()
505 {
506 matched = true;
507 for right_row in join_entry.cached.values() {
508 if let Some(chunk) =
509 E::append_matched_row(op, &mut builder, left_row, right_row)
510 {
511 yield chunk;
512 }
513 }
514 }
515 } else {
516 let memo_table = memo_table.as_mut().unwrap();
530 match op {
531 Op::Insert | Op::UpdateInsert => {
532 if key.null_bitmap().is_subset(null_matched)
533 && let join_entry = right_table.force_peek(&key)
534 && !join_entry.is_empty()
535 {
536 matched = true;
537 for right_row in join_entry.cached.values() {
538 let right_row: OwnedRow = right_row.clone();
539 memo_table.insert(right_row.clone().chain(
541 left_row.project(memo_table_lookup_prefix).into_owned_row(),
542 ));
543 if let Some(chunk) = E::append_matched_row(
544 Op::Insert,
545 &mut builder,
546 left_row,
547 right_row,
548 ) {
549 yield chunk;
550 }
551 }
552 }
553 }
554 Op::Delete | Op::UpdateDelete => {
555 let mut memo_rows_to_delete = vec![];
556 if key.null_bitmap().is_subset(null_matched) {
557 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
558 &(Bound::Unbounded, Bound::Unbounded);
559 let prefix = left_row.project(memo_table_lookup_prefix);
560 let state_table_iter = memo_table
561 .iter_with_prefix(prefix, sub_range, Default::default())
562 .await?;
563 pin_mut!(state_table_iter);
564
565 while let Some(memo_row) = state_table_iter.next().await {
566 matched = true;
567 let memo_row = memo_row?.into_owned_row();
568 memo_rows_to_delete.push(memo_row.clone());
569 if let Some(chunk) = E::append_matched_row(
570 Op::Delete,
571 &mut builder,
572 left_row,
573 memo_row.slice(0..right_size),
574 ) {
575 yield chunk;
576 }
577 }
578 }
579 for memo_row in memo_rows_to_delete {
580 memo_table.delete(memo_row);
582 }
583 }
584 }
585 }
586 if let Some(chunk) = E::match_end(
587 &mut builder,
588 match op {
589 Op::Insert | Op::UpdateInsert => Op::Insert,
590 Op::Delete | Op::UpdateDelete => Op::Delete,
591 },
592 left_row,
593 right_size,
594 matched,
595 ) {
596 yield chunk;
597 }
598 }
599
600 if let Some(chunk) = builder.take() {
601 yield chunk;
602 }
603 }
604}
605
606impl<
607 K: HashKey,
608 S: StateStore,
609 SD: ValueRowSerde,
610 const T: JoinTypePrimitive,
611 const APPEND_ONLY: bool,
612> TemporalJoinExecutor<K, S, SD, T, APPEND_ONLY>
613{
614 #[expect(clippy::too_many_arguments)]
615 pub fn new(
616 ctx: ActorContextRef,
617 info: ExecutorInfo,
618 left: Executor,
619 right: Executor,
620 table: ReplicatedStateTable<S, SD>,
621 left_join_keys: Vec<usize>,
622 right_join_keys: Vec<usize>,
623 null_safe: Vec<bool>,
624 condition: Option<NonStrictExpression>,
625 output_indices: Vec<usize>,
626 table_output_indices: Vec<usize>,
627 table_stream_key_indices: Vec<usize>,
628 watermark_sequence: AtomicU64Ref,
629 metrics: Arc<StreamingMetrics>,
630 chunk_size: usize,
631 join_key_data_types: Vec<DataType>,
632 memo_table: Option<StateTable<S>>,
633 ) -> Self {
634 let metrics_info =
635 MetricsInfo::new(metrics.clone(), table.table_id(), ctx.id, "temporal join");
636
637 let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info);
638
639 let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);
640
641 Self {
642 ctx,
643 info,
644 left,
645 right,
646 right_table: TemporalSide {
647 source: table,
648 table_stream_key_indices,
649 table_output_indices,
650 cache,
651 join_key_data_types,
652 },
653 left_join_keys,
654 right_join_keys,
655 null_safe,
656 condition,
657 output_indices,
658 chunk_size,
659 memo_table,
660 metrics,
661 }
662 }
663
664 #[try_stream(ok = Message, error = StreamExecutorError)]
665 async fn into_stream(mut self) {
666 let right_size = self.right.schema().len();
667
668 let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
669 &self.output_indices,
670 self.left.schema().len(),
671 right_size,
672 );
673
674 let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
675
676 let left_stream_key_indices = self.left.stream_key().to_vec();
677 let right_stream_key_indices = self.right.stream_key().to_vec();
678 let memo_table_lookup_prefix = self
679 .left_join_keys
680 .iter()
681 .cloned()
682 .chain(left_stream_key_indices)
683 .collect_vec();
684
685 let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
686
687 let full_schema: Vec<_> = self
688 .left
689 .schema()
690 .data_types()
691 .into_iter()
692 .chain(self.right.schema().data_types().into_iter())
693 .collect();
694
695 let input = align_input::<true>(self.left, self.right);
696 pin_mut!(input);
697 let barrier = expect_first_barrier(&mut input).await?;
698 let barrier_epoch = barrier.epoch;
699 yield Message::Barrier(barrier);
700 self.right_table.source.init_epoch(barrier_epoch).await?;
701 if !APPEND_ONLY {
702 self.memo_table
703 .as_mut()
704 .unwrap()
705 .init_epoch(barrier_epoch)
706 .await?;
707 }
708
709 #[for_await]
710 for msg in input {
711 self.right_table.cache.evict();
712 self.metrics
713 .temporal_join_cached_entry_count
714 .set(self.right_table.cache.len() as i64);
715 match msg? {
716 InternalMessage::WaterMark(watermark) => {
717 let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
718 yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
719 }
720 InternalMessage::Chunk(chunk) => {
721 let full_schema = full_schema.clone();
722
723 if T == JoinType::Inner {
724 let st1 = phase1::handle_chunk::<K, S, SD, phase1::Inner, APPEND_ONLY>(
725 self.chunk_size,
726 right_size,
727 full_schema,
728 &self.left_join_keys,
729 &mut self.right_table,
730 &memo_table_lookup_prefix,
731 &mut self.memo_table,
732 &null_matched,
733 chunk,
734 &self.metrics,
735 );
736 #[for_await]
737 for chunk in st1 {
738 let chunk = chunk?;
739 let new_chunk = if let Some(ref cond) = self.condition {
740 let (data_chunk, ops) = chunk.into_parts();
741 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
742 let passed_bitmap =
743 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
744 let (columns, vis) = data_chunk.into_parts();
745 let new_vis = vis & passed_bitmap;
746 StreamChunk::with_visibility(ops, columns, new_vis)
747 } else {
748 chunk
749 };
750 let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
751 yield Message::Chunk(new_chunk);
752 }
753 } else if let Some(ref cond) = self.condition {
754 let st1 = phase1::handle_chunk::<
756 K,
757 S,
758 SD,
759 phase1::LeftOuterWithCond,
760 APPEND_ONLY,
761 >(
762 self.chunk_size,
763 right_size,
764 full_schema,
765 &self.left_join_keys,
766 &mut self.right_table,
767 &memo_table_lookup_prefix,
768 &mut self.memo_table,
769 &null_matched,
770 chunk,
771 &self.metrics,
772 );
773 let mut matched_count = 0usize;
774 #[for_await]
775 for chunk in st1 {
776 let chunk = chunk?;
777 let (data_chunk, ops) = chunk.into_parts();
778 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
779 let passed_bitmap =
780 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
781 let (columns, vis) = data_chunk.into_parts();
782 let mut new_vis = BitmapBuilder::with_capacity(vis.len());
783 for (passed, not_match_end) in
784 passed_bitmap.iter().zip_eq_debug(vis.iter())
785 {
786 let is_match_end = !not_match_end;
787 let vis = if is_match_end && matched_count == 0 {
788 true
790 } else if is_match_end {
791 matched_count = 0;
793 false
795 } else {
796 if passed {
797 matched_count += 1;
798 }
799 passed
800 };
801 new_vis.append(vis);
802 }
803 let new_chunk = apply_indices_map(
804 StreamChunk::with_visibility(ops, columns, new_vis.finish()),
805 &self.output_indices,
806 );
807 yield Message::Chunk(new_chunk);
808 }
809 assert_eq!(matched_count, 0);
811 } else {
812 let st1 = phase1::handle_chunk::<K, S, SD, phase1::LeftOuter, APPEND_ONLY>(
813 self.chunk_size,
814 right_size,
815 full_schema,
816 &self.left_join_keys,
817 &mut self.right_table,
818 &memo_table_lookup_prefix,
819 &mut self.memo_table,
820 &null_matched,
821 chunk,
822 &self.metrics,
823 );
824 #[for_await]
825 for chunk in st1 {
826 let chunk = chunk?;
827 let new_chunk = apply_indices_map(chunk, &self.output_indices);
828 yield Message::Chunk(new_chunk);
829 }
830 }
831 }
832 InternalMessage::Barrier(updates, barrier) => {
833 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
834
835 self.right_table.update(
838 updates,
839 &self.right_join_keys,
840 &right_stream_key_indices,
841 )?;
842 let right_post_commit = self.right_table.source.commit(barrier.epoch).await?;
843 let memo_post_commit = if !APPEND_ONLY {
844 Some(
845 self.memo_table
846 .as_mut()
847 .unwrap()
848 .commit(barrier.epoch)
849 .await?,
850 )
851 } else {
852 None
853 };
854
855 yield Message::Barrier(barrier);
856
857 if let Some((_, true)) = right_post_commit
858 .post_yield_barrier(update_vnode_bitmap.clone())
859 .await?
860 {
861 self.right_table.cache.clear();
862 }
863 if let Some(memo_post_commit) = memo_post_commit {
864 memo_post_commit
865 .post_yield_barrier(update_vnode_bitmap.clone())
866 .await?;
867 }
868 }
869 }
870 }
871 }
872}
873
874impl<
875 K: HashKey,
876 S: StateStore,
877 SD: ValueRowSerde,
878 const T: JoinTypePrimitive,
879 const APPEND_ONLY: bool,
880> Execute for TemporalJoinExecutor<K, S, SD, T, APPEND_ONLY>
881{
882 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
883 self.into_stream().boxed()
884 }
885}
886
887#[cfg(test)]
888mod tests {
889 use std::collections::HashSet;
890 use std::sync::Arc;
891 use std::sync::atomic::AtomicU64;
892
893 use risingwave_common::array::*;
894 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
895 use risingwave_common::hash::Key32;
896 use risingwave_common::types::{DataType, ScalarRefImpl};
897 use risingwave_common::util::epoch::{EpochPair, test_epoch};
898 use risingwave_common::util::sort_util::OrderType;
899 use risingwave_common::util::value_encoding::BasicSerde;
900 use risingwave_hummock_test::test_utils::prepare_hummock_test_env;
901 use risingwave_storage::hummock::HummockStorage;
902
903 use super::*;
904 use crate::common::table::state_table::{
905 StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
906 };
907 use crate::common::table::test_utils::gen_pbtable;
908 use crate::executor::monitor::StreamingMetrics;
909 use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
910 use crate::executor::{ActorContext, ExecutorInfo, JoinType};
911
912 #[tokio::test]
929 async fn test_temporal_join_pk_prefix_staging_merge() {
930 let test_env = prepare_hummock_test_env().await;
931 let table_id = TableId::new(1);
932
933 let right_col_descs = vec![
937 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int32),
938 ColumnDesc::unnamed(ColumnId::new(2), DataType::Int32),
939 ColumnDesc::unnamed(ColumnId::new(3), DataType::Int32),
940 ];
941 let order_types = vec![OrderType::ascending(), OrderType::ascending()];
942 let pk_indices = vec![0usize, 1];
943 let pbtable = gen_pbtable(table_id, right_col_descs, order_types, pk_indices, 2);
944
945 test_env.register_table(pbtable.clone()).await;
946
947 {
949 let mut setup_table = StateTable::<HummockStorage>::from_table_catalog_inconsistent_op(
950 &pbtable,
951 test_env.storage.clone(),
952 None,
953 )
954 .await;
955 test_env
956 .storage
957 .start_epoch(test_epoch(1), HashSet::from_iter([table_id]));
958 setup_table
959 .init_epoch(EpochPair::new_test_epoch(test_epoch(1)))
960 .await
961 .unwrap();
962 setup_table.insert(OwnedRow::new(vec![
963 Some(1i32.into()),
964 Some(1i32.into()),
965 Some(100i32.into()),
966 ]));
967 setup_table.insert(OwnedRow::new(vec![
968 Some(1i32.into()),
969 Some(2i32.into()),
970 Some(200i32.into()),
971 ]));
972 setup_table.insert(OwnedRow::new(vec![
973 Some(2i32.into()),
974 Some(1i32.into()),
975 Some(300i32.into()),
976 ]));
977 test_env
978 .storage
979 .start_epoch(test_epoch(2), HashSet::from_iter([table_id]));
980 setup_table
981 .commit_for_test(EpochPair::new_test_epoch(test_epoch(2)))
982 .await
983 .unwrap();
984 test_env.commit_epoch(test_epoch(1)).await;
986 }
987
988 let output_column_ids = vec![ColumnId::new(1), ColumnId::new(2), ColumnId::new(3)];
990 let right_table = StateTableBuilder::<_, BasicSerde, true, _>::new(
991 &pbtable,
992 test_env.storage.clone(),
993 None,
994 )
995 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
996 .with_output_column_ids(output_column_ids)
997 .forbid_preload_all_rows()
998 .build()
999 .await;
1000
1001 let left_schema = Schema::new(vec![
1003 Field::unnamed(DataType::Int32),
1004 Field::unnamed(DataType::Int32),
1005 ]);
1006 let (mut left_tx, left_source) = MockSource::channel();
1007 let left_executor = left_source.into_executor(left_schema.clone(), vec![0]);
1008
1009 let right_schema = Schema::new(vec![
1012 Field::unnamed(DataType::Int32),
1013 Field::unnamed(DataType::Int32),
1014 Field::unnamed(DataType::Int32),
1015 ]);
1016 let (mut right_tx, right_source) = MockSource::channel();
1017 let right_executor = right_source.into_executor(right_schema.clone(), vec![0, 1]);
1018
1019 let table_output_indices = vec![0usize, 1, 2];
1022 let table_stream_key_indices = vec![0usize, 1];
1024
1025 let left_join_keys = vec![0usize];
1027 let right_join_keys = vec![0usize];
1028 let null_safe = vec![false];
1029 let join_key_data_types = vec![DataType::Int32];
1030
1031 let output_indices = vec![0usize, 1, 2, 3, 4];
1033 let output_schema = Schema::new(vec![
1034 Field::unnamed(DataType::Int32),
1035 Field::unnamed(DataType::Int32),
1036 Field::unnamed(DataType::Int32),
1037 Field::unnamed(DataType::Int32),
1038 Field::unnamed(DataType::Int32),
1039 ]);
1040 let info = ExecutorInfo::for_test(output_schema, vec![], "TemporalJoinTest".to_owned(), 0);
1041
1042 let executor = TemporalJoinExecutor::<
1043 Key32,
1044 HummockStorage,
1045 BasicSerde,
1046 { JoinType::Inner },
1047 true,
1048 >::new(
1049 ActorContext::for_test(0),
1050 info.clone(),
1051 left_executor,
1052 right_executor,
1053 right_table,
1054 left_join_keys,
1055 right_join_keys,
1056 null_safe,
1057 None, output_indices,
1059 table_output_indices,
1060 table_stream_key_indices,
1061 Arc::new(AtomicU64::new(0)),
1062 Arc::new(StreamingMetrics::unused()),
1063 1024,
1064 join_key_data_types,
1065 None, );
1067
1068 let mut stream = Box::new(executor).execute();
1069
1070 left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(2), test_epoch(1), false);
1072 right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(2), test_epoch(1), false);
1073 stream.expect_barrier().await;
1074
1075 right_tx.push_chunk(StreamChunk::from_pretty(
1079 " i i i
1080 + 3 1 400",
1081 ));
1082 test_env
1084 .storage
1085 .start_epoch(test_epoch(3), HashSet::from_iter([table_id]));
1086 left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(3), test_epoch(2), false);
1087 right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(3), test_epoch(2), false);
1088 stream.expect_barrier().await;
1089
1090 test_env
1092 .storage
1093 .start_epoch(test_epoch(4), HashSet::from_iter([table_id]));
1094
1095 left_tx.push_chunk(StreamChunk::from_pretty(
1099 " i i
1100 + 1 111
1101 + 3 333",
1102 ));
1103 left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(4), test_epoch(3), true);
1104 right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(4), test_epoch(3), true);
1105
1106 let mut output_rows: Vec<[i32; 5]> = vec![];
1108 loop {
1109 match stream.next().await.unwrap().unwrap() {
1110 Message::Chunk(chunk) => {
1111 for (op, row) in chunk.rows() {
1112 assert_eq!(op, Op::Insert);
1113 let row: [i32; 5] =
1114 std::array::from_fn(|i| match row.datum_at(i).unwrap() {
1115 ScalarRefImpl::Int32(v) => v,
1116 _ => panic!("expected Int32"),
1117 });
1118 output_rows.push(row);
1119 }
1120 }
1121 Message::Barrier(_) => break,
1122 _ => {}
1123 }
1124 }
1125
1126 output_rows.sort();
1127 assert_eq!(
1128 output_rows,
1129 vec![
1130 [1, 111, 1, 1, 100], [1, 111, 1, 2, 200], [3, 333, 3, 1, 400], ]
1134 );
1135 }
1136}