1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17
18use either::Either;
19use futures::TryStreamExt;
20use futures::stream::{self, PollNext};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::hash::{HashKey, NullBitmap};
25use risingwave_common::row::RowExt;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_common_estimate_size::{EstimateSize, KvSize};
28use risingwave_expr::expr::NonStrictExpression;
29use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
30use risingwave_storage::store::PrefetchOptions;
31use risingwave_storage::table::TableIter;
32use risingwave_storage::table::batch_table::BatchTable;
33
34use super::join::{JoinType, JoinTypePrimitive};
35use super::monitor::TemporalJoinMetrics;
36use crate::cache::{ManagedLruCache, cache_may_stale};
37use crate::common::metrics::MetricsInfo;
38use crate::executor::join::builder::JoinStreamChunkBuilder;
39use crate::executor::prelude::*;
40
41pub struct TemporalJoinExecutor<
42 K: HashKey,
43 S: StateStore,
44 const T: JoinTypePrimitive,
45 const APPEND_ONLY: bool,
46> {
47 ctx: ActorContextRef,
48 #[allow(dead_code)]
49 info: ExecutorInfo,
50 left: Executor,
51 right: Executor,
52 right_table: TemporalSide<K, S>,
53 left_join_keys: Vec<usize>,
54 right_join_keys: Vec<usize>,
55 null_safe: Vec<bool>,
56 condition: Option<NonStrictExpression>,
57 output_indices: Vec<usize>,
58 chunk_size: usize,
59 memo_table: Option<StateTable<S>>,
60 metrics: TemporalJoinMetrics,
61}
62
63#[derive(Default)]
64pub struct JoinEntry {
65 cached: HashMap<OwnedRow, OwnedRow>,
67 kv_heap_size: KvSize,
68}
69
70impl EstimateSize for JoinEntry {
71 fn estimated_heap_size(&self) -> usize {
72 self.kv_heap_size.size()
75 }
76}
77
78impl JoinEntry {
79 pub fn insert(&mut self, key: OwnedRow, value: OwnedRow) {
81 if let Entry::Vacant(e) = self.cached.entry(key) {
84 self.kv_heap_size.add(e.key(), &value);
85 e.insert(value);
86 } else {
87 panic!("value {:?} double insert", value);
88 }
89 }
90
91 pub fn remove(&mut self, key: &OwnedRow) {
93 if let Some(value) = self.cached.remove(key) {
94 self.kv_heap_size.sub(key, &value);
95 } else {
96 panic!("key {:?} should be in the cache", key);
97 }
98 }
99
100 pub fn is_empty(&self) -> bool {
101 self.cached.is_empty()
102 }
103}
104
105struct TemporalSide<K: HashKey, S: StateStore> {
106 source: BatchTable<S>,
107 table_stream_key_indices: Vec<usize>,
108 table_output_indices: Vec<usize>,
109 cache: ManagedLruCache<K, JoinEntry>,
110 join_key_data_types: Vec<DataType>,
111}
112
113impl<K: HashKey, S: StateStore> TemporalSide<K, S> {
114 async fn fetch_or_promote_keys(
117 &mut self,
118 keys: impl Iterator<Item = &K>,
119 epoch: HummockEpoch,
120 metrics: &TemporalJoinMetrics,
121 ) -> StreamExecutorResult<()> {
122 let mut futs = Vec::with_capacity(keys.size_hint().1.unwrap_or(0));
123 for key in keys {
124 metrics.temporal_join_total_query_cache_count.inc();
125
126 if self.cache.get(key).is_none() {
127 metrics.temporal_join_cache_miss_count.inc();
128
129 futs.push(async {
130 let pk_prefix = key.deserialize(&self.join_key_data_types)?;
131
132 let iter = self
133 .source
134 .batch_iter_with_pk_bounds(
135 HummockReadEpoch::NoWait(epoch),
136 &pk_prefix,
137 ..,
138 false,
139 PrefetchOptions::default(),
140 )
141 .await?;
142
143 let mut entry = JoinEntry::default();
144
145 pin_mut!(iter);
146 while let Some(row) = iter.next_row().await? {
147 entry.insert(
148 row.as_ref()
149 .project(&self.table_stream_key_indices)
150 .into_owned_row(),
151 row.project(&self.table_output_indices).into_owned_row(),
152 );
153 }
154 let key = key.clone();
155 Ok((key, entry)) as StreamExecutorResult<_>
156 });
157 }
158 }
159
160 #[for_await]
161 for res in stream::iter(futs).buffered(16) {
162 let (key, entry) = res?;
163 self.cache.put(key, entry);
164 }
165
166 Ok(())
167 }
168
169 fn force_peek(&self, key: &K) -> &JoinEntry {
170 self.cache.peek(key).expect("key should exists")
171 }
172
173 fn update(
174 &mut self,
175 chunks: Vec<StreamChunk>,
176 join_keys: &[usize],
177 right_stream_key_indices: &[usize],
178 ) -> StreamExecutorResult<()> {
179 for chunk in chunks {
180 let keys = K::build_many(join_keys, chunk.data_chunk());
181 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
182 let Some((op, row)) = r else {
183 continue;
184 };
185 if self.cache.contains(&key) {
186 let mut entry = self.cache.get_mut(&key).unwrap();
188 let stream_key = row.project(right_stream_key_indices).into_owned_row();
189 match op {
190 Op::Insert | Op::UpdateInsert => {
191 entry.insert(stream_key, row.into_owned_row())
192 }
193 Op::Delete | Op::UpdateDelete => entry.remove(&stream_key),
194 };
195 }
196 }
197 }
198 Ok(())
199 }
200}
201
202pub(super) enum InternalMessage {
203 Chunk(StreamChunk),
204 Barrier(Vec<StreamChunk>, Barrier),
205 WaterMark(Watermark),
206}
207
208#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
209async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
210 #[for_await]
211 for item in stream {
212 match item? {
213 Message::Watermark(_) => {
214 }
216 Message::Chunk(c) => yield c,
217 Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
218 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
219 }
220 Message::Barrier(_) => return Ok(()),
221 }
222 }
223}
224
225#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
226async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
227 #[for_await]
228 for item in stream {
229 match item? {
230 Message::Watermark(w) => {
231 yield InternalMessage::WaterMark(w);
232 }
233 Message::Chunk(c) => yield InternalMessage::Chunk(c),
234 Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
235 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
236 }
237 Message::Barrier(_) => return Ok(()),
238 }
239 }
240}
241
242#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
247pub(super) async fn align_input<const YIELD_RIGHT_CHUNKS: bool>(left: Executor, right: Executor) {
248 let mut left = pin!(left.execute());
249 let mut right = pin!(right.execute());
250 loop {
252 let mut right_chunks = vec![];
253 'inner: loop {
255 let mut combined = stream::select_with_strategy(
256 left.by_ref().map(Either::Left),
257 right.by_ref().map(Either::Right),
258 |_: &mut ()| PollNext::Left,
259 );
260 match combined.next().await {
261 Some(Either::Left(Ok(Message::Chunk(c)))) => yield InternalMessage::Chunk(c),
262 Some(Either::Right(Ok(Message::Chunk(c)))) => {
263 if YIELD_RIGHT_CHUNKS {
264 right_chunks.push(c);
265 }
266 }
267 Some(Either::Left(Ok(Message::Barrier(b)))) => {
268 let mut remain = chunks_until_barrier(right.by_ref(), b.clone())
269 .try_collect()
270 .await?;
271 if YIELD_RIGHT_CHUNKS {
272 right_chunks.append(&mut remain);
273 }
274 yield InternalMessage::Barrier(right_chunks, b);
275 break 'inner;
276 }
277 Some(Either::Right(Ok(Message::Barrier(b)))) => {
278 #[for_await]
279 for internal_message in
280 internal_messages_until_barrier(left.by_ref(), b.clone())
281 {
282 yield internal_message?;
283 }
284 yield InternalMessage::Barrier(right_chunks, b);
285 break 'inner;
286 }
287 Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e),
288 Some(Either::Left(Ok(Message::Watermark(w)))) => {
289 yield InternalMessage::WaterMark(w);
290 }
291 Some(Either::Right(Ok(Message::Watermark(_)))) => {
292 }
294 None => return Ok(()),
295 }
296 }
297 }
298}
299
300pub(super) fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk {
301 let (data_chunk, ops) = chunk.into_parts();
302 let (columns, vis) = data_chunk.into_parts();
303 let output_columns = indices
304 .iter()
305 .cloned()
306 .map(|idx| columns[idx].clone())
307 .collect();
308 StreamChunk::with_visibility(ops, output_columns, vis)
309}
310
311pub(super) mod phase1 {
312 use std::ops::Bound;
313
314 use futures::{StreamExt, pin_mut};
315 use futures_async_stream::try_stream;
316 use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
317 use risingwave_common::array::{Op, StreamChunk};
318 use risingwave_common::hash::{HashKey, NullBitmap};
319 use risingwave_common::row::{self, OwnedRow, Row, RowExt};
320 use risingwave_common::types::{DataType, DatumRef};
321 use risingwave_common::util::iter_util::ZipEqDebug;
322 use risingwave_hummock_sdk::HummockEpoch;
323 use risingwave_storage::StateStore;
324
325 use super::{StreamExecutorError, TemporalSide};
326 use crate::common::table::state_table::StateTable;
327 use crate::executor::monitor::TemporalJoinMetrics;
328
329 pub trait Phase1Evaluation {
330 #[must_use = "consume chunk if produced"]
332 fn append_matched_row(
333 op: Op,
334 builder: &mut StreamChunkBuilder,
335 left_row: impl Row,
336 right_row: impl Row,
337 ) -> Option<StreamChunk>;
338
339 #[must_use = "consume chunk if produced"]
341 fn match_end(
342 builder: &mut StreamChunkBuilder,
343 op: Op,
344 left_row: impl Row,
345 right_size: usize,
346 matched: bool,
347 ) -> Option<StreamChunk>;
348 }
349
350 pub struct Inner;
351 pub struct LeftOuter;
352 pub struct LeftOuterWithCond;
353
354 impl Phase1Evaluation for Inner {
355 fn append_matched_row(
356 op: Op,
357 builder: &mut StreamChunkBuilder,
358 left_row: impl Row,
359 right_row: impl Row,
360 ) -> Option<StreamChunk> {
361 builder.append_row(op, left_row.chain(right_row))
362 }
363
364 fn match_end(
365 _builder: &mut StreamChunkBuilder,
366 _op: Op,
367 _left_row: impl Row,
368 _right_size: usize,
369 _matched: bool,
370 ) -> Option<StreamChunk> {
371 None
372 }
373 }
374
375 impl Phase1Evaluation for LeftOuter {
376 fn append_matched_row(
377 op: Op,
378 builder: &mut StreamChunkBuilder,
379 left_row: impl Row,
380 right_row: impl Row,
381 ) -> Option<StreamChunk> {
382 builder.append_row(op, left_row.chain(right_row))
383 }
384
385 fn match_end(
386 builder: &mut StreamChunkBuilder,
387 op: Op,
388 left_row: impl Row,
389 right_size: usize,
390 matched: bool,
391 ) -> Option<StreamChunk> {
392 if !matched {
393 builder.append_row(
395 op,
396 left_row.chain(row::repeat_n(DatumRef::None, right_size)),
397 )
398 } else {
399 None
400 }
401 }
402 }
403
404 impl Phase1Evaluation for LeftOuterWithCond {
405 fn append_matched_row(
406 op: Op,
407 builder: &mut StreamChunkBuilder,
408 left_row: impl Row,
409 right_row: impl Row,
410 ) -> Option<StreamChunk> {
411 builder.append_row(op, left_row.chain(right_row))
412 }
413
414 fn match_end(
415 builder: &mut StreamChunkBuilder,
416 op: Op,
417 left_row: impl Row,
418 right_size: usize,
419 _matched: bool,
420 ) -> Option<StreamChunk> {
421 builder.append_row_invisible(
424 op,
425 left_row.chain(row::repeat_n(DatumRef::None, right_size)),
426 )
427 }
428 }
429
430 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
431 #[allow(clippy::too_many_arguments)]
432 pub(super) async fn handle_chunk<
433 'a,
434 K: HashKey,
435 S: StateStore,
436 E: Phase1Evaluation,
437 const APPEND_ONLY: bool,
438 >(
439 chunk_size: usize,
440 right_size: usize,
441 full_schema: Vec<DataType>,
442 epoch: HummockEpoch,
443 left_join_keys: &'a [usize],
444 right_table: &'a mut TemporalSide<K, S>,
445 memo_table_lookup_prefix: &'a [usize],
446 memo_table: &'a mut Option<StateTable<S>>,
447 null_matched: &'a K::Bitmap,
448 chunk: StreamChunk,
449 metrics: &'a TemporalJoinMetrics,
450 ) {
451 let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
452 let keys = K::build_many(left_join_keys, chunk.data_chunk());
453 let to_fetch_keys = chunk
454 .visibility()
455 .iter()
456 .zip_eq_debug(keys.iter())
457 .zip_eq_debug(chunk.ops())
458 .filter_map(|((vis, key), op)| {
459 if vis {
460 if APPEND_ONLY {
461 assert_eq!(*op, Op::Insert);
462 Some(key)
463 } else {
464 match op {
465 Op::Insert | Op::UpdateInsert => Some(key),
466 Op::Delete | Op::UpdateDelete => None,
467 }
468 }
469 } else {
470 None
471 }
472 });
473 right_table
474 .fetch_or_promote_keys(to_fetch_keys, epoch, metrics)
475 .await?;
476
477 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
478 let Some((op, left_row)) = r else {
479 continue;
480 };
481
482 let mut matched = false;
483
484 if APPEND_ONLY {
485 if key.null_bitmap().is_subset(null_matched)
487 && let join_entry = right_table.force_peek(&key)
488 && !join_entry.is_empty()
489 {
490 matched = true;
491 for right_row in join_entry.cached.values() {
492 if let Some(chunk) =
493 E::append_matched_row(op, &mut builder, left_row, right_row)
494 {
495 yield chunk;
496 }
497 }
498 }
499 } else {
500 let memo_table = memo_table.as_mut().unwrap();
514 match op {
515 Op::Insert | Op::UpdateInsert => {
516 if key.null_bitmap().is_subset(null_matched)
517 && let join_entry = right_table.force_peek(&key)
518 && !join_entry.is_empty()
519 {
520 matched = true;
521 for right_row in join_entry.cached.values() {
522 let right_row: OwnedRow = right_row.clone();
523 memo_table.insert(right_row.clone().chain(
525 left_row.project(memo_table_lookup_prefix).into_owned_row(),
526 ));
527 if let Some(chunk) = E::append_matched_row(
528 Op::Insert,
529 &mut builder,
530 left_row,
531 right_row,
532 ) {
533 yield chunk;
534 }
535 }
536 }
537 }
538 Op::Delete | Op::UpdateDelete => {
539 let mut memo_rows_to_delete = vec![];
540 if key.null_bitmap().is_subset(null_matched) {
541 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
542 &(Bound::Unbounded, Bound::Unbounded);
543 let prefix = left_row.project(memo_table_lookup_prefix);
544 let state_table_iter = memo_table
545 .iter_with_prefix(prefix, sub_range, Default::default())
546 .await?;
547 pin_mut!(state_table_iter);
548
549 while let Some(memo_row) = state_table_iter.next().await {
550 matched = true;
551 let memo_row = memo_row?.into_owned_row();
552 memo_rows_to_delete.push(memo_row.clone());
553 if let Some(chunk) = E::append_matched_row(
554 Op::Delete,
555 &mut builder,
556 left_row,
557 memo_row.slice(0..right_size),
558 ) {
559 yield chunk;
560 }
561 }
562 }
563 for memo_row in memo_rows_to_delete {
564 memo_table.delete(memo_row);
566 }
567 }
568 }
569 }
570 if let Some(chunk) = E::match_end(
571 &mut builder,
572 match op {
573 Op::Insert | Op::UpdateInsert => Op::Insert,
574 Op::Delete | Op::UpdateDelete => Op::Delete,
575 },
576 left_row,
577 right_size,
578 matched,
579 ) {
580 yield chunk;
581 }
582 }
583
584 if let Some(chunk) = builder.take() {
585 yield chunk;
586 }
587 }
588}
589
590impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool>
591 TemporalJoinExecutor<K, S, T, APPEND_ONLY>
592{
593 #[allow(clippy::too_many_arguments)]
594 pub fn new(
595 ctx: ActorContextRef,
596 info: ExecutorInfo,
597 left: Executor,
598 right: Executor,
599 table: BatchTable<S>,
600 left_join_keys: Vec<usize>,
601 right_join_keys: Vec<usize>,
602 null_safe: Vec<bool>,
603 condition: Option<NonStrictExpression>,
604 output_indices: Vec<usize>,
605 table_output_indices: Vec<usize>,
606 table_stream_key_indices: Vec<usize>,
607 watermark_sequence: AtomicU64Ref,
608 metrics: Arc<StreamingMetrics>,
609 chunk_size: usize,
610 join_key_data_types: Vec<DataType>,
611 memo_table: Option<StateTable<S>>,
612 ) -> Self {
613 let metrics_info = MetricsInfo::new(
614 metrics.clone(),
615 table.table_id().table_id,
616 ctx.id,
617 "temporal join",
618 );
619
620 let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info);
621
622 let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);
623
624 Self {
625 ctx,
626 info,
627 left,
628 right,
629 right_table: TemporalSide {
630 source: table,
631 table_stream_key_indices,
632 table_output_indices,
633 cache,
634 join_key_data_types,
635 },
636 left_join_keys,
637 right_join_keys,
638 null_safe,
639 condition,
640 output_indices,
641 chunk_size,
642 memo_table,
643 metrics,
644 }
645 }
646
647 #[try_stream(ok = Message, error = StreamExecutorError)]
648 async fn into_stream(mut self) {
649 let right_size = self.right.schema().len();
650
651 let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
652 &self.output_indices,
653 self.left.schema().len(),
654 right_size,
655 );
656
657 let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
658
659 let left_stream_key_indices = self.left.pk_indices().to_vec();
660 let right_stream_key_indices = self.right.pk_indices().to_vec();
661 let memo_table_lookup_prefix = self
662 .left_join_keys
663 .iter()
664 .cloned()
665 .chain(left_stream_key_indices)
666 .collect_vec();
667
668 let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
669
670 let mut prev_epoch = None;
671
672 let full_schema: Vec<_> = self
673 .left
674 .schema()
675 .data_types()
676 .into_iter()
677 .chain(self.right.schema().data_types().into_iter())
678 .collect();
679
680 let mut wait_first_barrier = true;
681
682 #[for_await]
683 for msg in align_input::<true>(self.left, self.right) {
684 self.right_table.cache.evict();
685 self.metrics
686 .temporal_join_cached_entry_count
687 .set(self.right_table.cache.len() as i64);
688 match msg? {
689 InternalMessage::WaterMark(watermark) => {
690 let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
691 yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
692 }
693 InternalMessage::Chunk(chunk) => {
694 let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
695
696 let full_schema = full_schema.clone();
697
698 if T == JoinType::Inner {
699 let st1 = phase1::handle_chunk::<K, S, phase1::Inner, APPEND_ONLY>(
700 self.chunk_size,
701 right_size,
702 full_schema,
703 epoch,
704 &self.left_join_keys,
705 &mut self.right_table,
706 &memo_table_lookup_prefix,
707 &mut self.memo_table,
708 &null_matched,
709 chunk,
710 &self.metrics,
711 );
712 #[for_await]
713 for chunk in st1 {
714 let chunk = chunk?;
715 let new_chunk = if let Some(ref cond) = self.condition {
716 let (data_chunk, ops) = chunk.into_parts();
717 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
718 let passed_bitmap =
719 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
720 let (columns, vis) = data_chunk.into_parts();
721 let new_vis = vis & passed_bitmap;
722 StreamChunk::with_visibility(ops, columns, new_vis)
723 } else {
724 chunk
725 };
726 let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
727 yield Message::Chunk(new_chunk);
728 }
729 } else if let Some(ref cond) = self.condition {
730 let st1 =
732 phase1::handle_chunk::<K, S, phase1::LeftOuterWithCond, APPEND_ONLY>(
733 self.chunk_size,
734 right_size,
735 full_schema,
736 epoch,
737 &self.left_join_keys,
738 &mut self.right_table,
739 &memo_table_lookup_prefix,
740 &mut self.memo_table,
741 &null_matched,
742 chunk,
743 &self.metrics,
744 );
745 let mut matched_count = 0usize;
746 #[for_await]
747 for chunk in st1 {
748 let chunk = chunk?;
749 let (data_chunk, ops) = chunk.into_parts();
750 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
751 let passed_bitmap =
752 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
753 let (columns, vis) = data_chunk.into_parts();
754 let mut new_vis = BitmapBuilder::with_capacity(vis.len());
755 for (passed, not_match_end) in
756 passed_bitmap.iter().zip_eq_debug(vis.iter())
757 {
758 let is_match_end = !not_match_end;
759 let vis = if is_match_end && matched_count == 0 {
760 true
762 } else if is_match_end {
763 matched_count = 0;
765 false
767 } else {
768 if passed {
769 matched_count += 1;
770 }
771 passed
772 };
773 new_vis.append(vis);
774 }
775 let new_chunk = apply_indices_map(
776 StreamChunk::with_visibility(ops, columns, new_vis.finish()),
777 &self.output_indices,
778 );
779 yield Message::Chunk(new_chunk);
780 }
781 assert_eq!(matched_count, 0);
783 } else {
784 let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuter, APPEND_ONLY>(
785 self.chunk_size,
786 right_size,
787 full_schema,
788 epoch,
789 &self.left_join_keys,
790 &mut self.right_table,
791 &memo_table_lookup_prefix,
792 &mut self.memo_table,
793 &null_matched,
794 chunk,
795 &self.metrics,
796 );
797 #[for_await]
798 for chunk in st1 {
799 let chunk = chunk?;
800 let new_chunk = apply_indices_map(chunk, &self.output_indices);
801 yield Message::Chunk(new_chunk);
802 }
803 }
804 }
805 InternalMessage::Barrier(updates, barrier) => {
806 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
807 let barrier_epoch = barrier.epoch;
808 if !APPEND_ONLY {
809 if wait_first_barrier {
810 wait_first_barrier = false;
811 yield Message::Barrier(barrier);
812 self.memo_table
813 .as_mut()
814 .unwrap()
815 .init_epoch(barrier_epoch)
816 .await?;
817 } else {
818 let post_commit = self
819 .memo_table
820 .as_mut()
821 .unwrap()
822 .commit(barrier.epoch)
823 .await?;
824 yield Message::Barrier(barrier);
825 post_commit
826 .post_yield_barrier(update_vnode_bitmap.clone())
827 .await?;
828 }
829 } else {
830 yield Message::Barrier(barrier);
831 }
832 if let Some(vnodes) = update_vnode_bitmap {
833 let prev_vnodes =
834 self.right_table.source.update_vnode_bitmap(vnodes.clone());
835 if cache_may_stale(&prev_vnodes, &vnodes) {
836 self.right_table.cache.clear();
837 }
838 }
839 self.right_table.update(
840 updates,
841 &self.right_join_keys,
842 &right_stream_key_indices,
843 )?;
844 prev_epoch = Some(barrier_epoch.prev);
845 }
846 }
847 }
848 }
849}
850
851impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool> Execute
852 for TemporalJoinExecutor<K, S, T, APPEND_ONLY>
853{
854 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
855 self.into_stream().boxed()
856 }
857}