1use std::alloc::Global;
16use std::collections::HashMap;
17use std::collections::hash_map::Entry;
18
19use either::Either;
20use futures::TryStreamExt;
21use futures::stream::{self, PollNext};
22use itertools::Itertools;
23use local_stats_alloc::{SharedStatsAlloc, StatsAlloc};
24use lru::DefaultHasher;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::BitmapBuilder;
27use risingwave_common::hash::{HashKey, NullBitmap};
28use risingwave_common::row::RowExt;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_common_estimate_size::{EstimateSize, KvSize};
31use risingwave_expr::expr::NonStrictExpression;
32use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::TableIter;
35use risingwave_storage::table::batch_table::BatchTable;
36
37use super::join::{JoinType, JoinTypePrimitive};
38use super::monitor::TemporalJoinMetrics;
39use crate::cache::{ManagedLruCache, cache_may_stale};
40use crate::common::metrics::MetricsInfo;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::prelude::*;
43
44pub struct TemporalJoinExecutor<
45 K: HashKey,
46 S: StateStore,
47 const T: JoinTypePrimitive,
48 const APPEND_ONLY: bool,
49> {
50 ctx: ActorContextRef,
51 #[allow(dead_code)]
52 info: ExecutorInfo,
53 left: Executor,
54 right: Executor,
55 right_table: TemporalSide<K, S>,
56 left_join_keys: Vec<usize>,
57 right_join_keys: Vec<usize>,
58 null_safe: Vec<bool>,
59 condition: Option<NonStrictExpression>,
60 output_indices: Vec<usize>,
61 chunk_size: usize,
62 memo_table: Option<StateTable<S>>,
63 metrics: TemporalJoinMetrics,
64}
65
66#[derive(Default)]
67pub struct JoinEntry {
68 cached: HashMap<OwnedRow, OwnedRow>,
70 kv_heap_size: KvSize,
71}
72
73impl EstimateSize for JoinEntry {
74 fn estimated_heap_size(&self) -> usize {
75 self.kv_heap_size.size()
78 }
79}
80
81impl JoinEntry {
82 pub fn insert(&mut self, key: OwnedRow, value: OwnedRow) {
84 if let Entry::Vacant(e) = self.cached.entry(key) {
87 self.kv_heap_size.add(e.key(), &value);
88 e.insert(value);
89 }
90 }
91
92 pub fn remove(&mut self, key: &OwnedRow) {
94 if let Some(value) = self.cached.remove(key) {
95 self.kv_heap_size.sub(key, &value);
96 } else {
97 panic!("key {:?} should be in the cache", key);
98 }
99 }
100
101 pub fn is_empty(&self) -> bool {
102 self.cached.is_empty()
103 }
104}
105
106struct TemporalSide<K: HashKey, S: StateStore> {
107 source: BatchTable<S>,
108 table_stream_key_indices: Vec<usize>,
109 table_output_indices: Vec<usize>,
110 cache: ManagedLruCache<K, JoinEntry, DefaultHasher, SharedStatsAlloc<Global>>,
111 join_key_data_types: Vec<DataType>,
112}
113
114impl<K: HashKey, S: StateStore> TemporalSide<K, S> {
115 async fn fetch_or_promote_keys(
118 &mut self,
119 keys: impl Iterator<Item = &K>,
120 epoch: HummockEpoch,
121 metrics: &TemporalJoinMetrics,
122 ) -> StreamExecutorResult<()> {
123 let mut futs = Vec::with_capacity(keys.size_hint().1.unwrap_or(0));
124 for key in keys {
125 metrics.temporal_join_total_query_cache_count.inc();
126
127 if self.cache.get(key).is_none() {
128 metrics.temporal_join_cache_miss_count.inc();
129
130 futs.push(async {
131 let pk_prefix = key.deserialize(&self.join_key_data_types)?;
132
133 let iter = self
134 .source
135 .batch_iter_with_pk_bounds(
136 HummockReadEpoch::NoWait(epoch),
137 &pk_prefix,
138 ..,
139 false,
140 PrefetchOptions::default(),
141 )
142 .await?;
143
144 let mut entry = JoinEntry::default();
145
146 pin_mut!(iter);
147 while let Some(row) = iter.next_row().await? {
148 entry.insert(
149 row.as_ref()
150 .project(&self.table_stream_key_indices)
151 .into_owned_row(),
152 row.project(&self.table_output_indices).into_owned_row(),
153 );
154 }
155 let key = key.clone();
156 Ok((key, entry)) as StreamExecutorResult<_>
157 });
158 }
159 }
160
161 #[for_await]
162 for res in stream::iter(futs).buffered(16) {
163 let (key, entry) = res?;
164 self.cache.put(key, entry);
165 }
166
167 Ok(())
168 }
169
170 fn force_peek(&self, key: &K) -> &JoinEntry {
171 self.cache.peek(key).expect("key should exists")
172 }
173
174 fn update(
175 &mut self,
176 chunks: Vec<StreamChunk>,
177 join_keys: &[usize],
178 right_stream_key_indices: &[usize],
179 ) -> StreamExecutorResult<()> {
180 for chunk in chunks {
181 let keys = K::build_many(join_keys, chunk.data_chunk());
182 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
183 let Some((op, row)) = r else {
184 continue;
185 };
186 if self.cache.contains(&key) {
187 let mut entry = self.cache.get_mut(&key).unwrap();
189 let stream_key = row.project(right_stream_key_indices).into_owned_row();
190 match op {
191 Op::Insert | Op::UpdateInsert => {
192 entry.insert(stream_key, row.into_owned_row())
193 }
194 Op::Delete | Op::UpdateDelete => entry.remove(&stream_key),
195 };
196 }
197 }
198 }
199 Ok(())
200 }
201}
202
203pub(super) enum InternalMessage {
204 Chunk(StreamChunk),
205 Barrier(Vec<StreamChunk>, Barrier),
206 WaterMark(Watermark),
207}
208
209#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
210async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
211 #[for_await]
212 for item in stream {
213 match item? {
214 Message::Watermark(_) => {
215 }
217 Message::Chunk(c) => yield c,
218 Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
219 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
220 }
221 Message::Barrier(_) => return Ok(()),
222 }
223 }
224}
225
226#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
227async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
228 #[for_await]
229 for item in stream {
230 match item? {
231 Message::Watermark(w) => {
232 yield InternalMessage::WaterMark(w);
233 }
234 Message::Chunk(c) => yield InternalMessage::Chunk(c),
235 Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
236 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
237 }
238 Message::Barrier(_) => return Ok(()),
239 }
240 }
241}
242
243#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
248pub(super) async fn align_input<const YIELD_RIGHT_CHUNKS: bool>(left: Executor, right: Executor) {
249 let mut left = pin!(left.execute());
250 let mut right = pin!(right.execute());
251 loop {
253 let mut right_chunks = vec![];
254 'inner: loop {
256 let mut combined = stream::select_with_strategy(
257 left.by_ref().map(Either::Left),
258 right.by_ref().map(Either::Right),
259 |_: &mut ()| PollNext::Left,
260 );
261 match combined.next().await {
262 Some(Either::Left(Ok(Message::Chunk(c)))) => yield InternalMessage::Chunk(c),
263 Some(Either::Right(Ok(Message::Chunk(c)))) => {
264 if YIELD_RIGHT_CHUNKS {
265 right_chunks.push(c);
266 }
267 }
268 Some(Either::Left(Ok(Message::Barrier(b)))) => {
269 let mut remain = chunks_until_barrier(right.by_ref(), b.clone())
270 .try_collect()
271 .await?;
272 if YIELD_RIGHT_CHUNKS {
273 right_chunks.append(&mut remain);
274 }
275 yield InternalMessage::Barrier(right_chunks, b);
276 break 'inner;
277 }
278 Some(Either::Right(Ok(Message::Barrier(b)))) => {
279 #[for_await]
280 for internal_message in
281 internal_messages_until_barrier(left.by_ref(), b.clone())
282 {
283 yield internal_message?;
284 }
285 yield InternalMessage::Barrier(right_chunks, b);
286 break 'inner;
287 }
288 Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e),
289 Some(Either::Left(Ok(Message::Watermark(w)))) => {
290 yield InternalMessage::WaterMark(w);
291 }
292 Some(Either::Right(Ok(Message::Watermark(_)))) => {
293 }
295 None => return Ok(()),
296 }
297 }
298 }
299}
300
301pub(super) fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk {
302 let (data_chunk, ops) = chunk.into_parts();
303 let (columns, vis) = data_chunk.into_parts();
304 let output_columns = indices
305 .iter()
306 .cloned()
307 .map(|idx| columns[idx].clone())
308 .collect();
309 StreamChunk::with_visibility(ops, output_columns, vis)
310}
311
312pub(super) mod phase1 {
313 use std::ops::Bound;
314
315 use futures::{StreamExt, pin_mut};
316 use futures_async_stream::try_stream;
317 use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
318 use risingwave_common::array::{Op, StreamChunk};
319 use risingwave_common::hash::{HashKey, NullBitmap};
320 use risingwave_common::row::{self, OwnedRow, Row, RowExt};
321 use risingwave_common::types::{DataType, DatumRef};
322 use risingwave_common::util::iter_util::ZipEqDebug;
323 use risingwave_hummock_sdk::HummockEpoch;
324 use risingwave_storage::StateStore;
325
326 use super::{StreamExecutorError, TemporalSide};
327 use crate::common::table::state_table::StateTable;
328 use crate::executor::monitor::TemporalJoinMetrics;
329
330 pub trait Phase1Evaluation {
331 #[must_use = "consume chunk if produced"]
333 fn append_matched_row(
334 op: Op,
335 builder: &mut StreamChunkBuilder,
336 left_row: impl Row,
337 right_row: impl Row,
338 ) -> Option<StreamChunk>;
339
340 #[must_use = "consume chunk if produced"]
342 fn match_end(
343 builder: &mut StreamChunkBuilder,
344 op: Op,
345 left_row: impl Row,
346 right_size: usize,
347 matched: bool,
348 ) -> Option<StreamChunk>;
349 }
350
351 pub struct Inner;
352 pub struct LeftOuter;
353 pub struct LeftOuterWithCond;
354
355 impl Phase1Evaluation for Inner {
356 fn append_matched_row(
357 op: Op,
358 builder: &mut StreamChunkBuilder,
359 left_row: impl Row,
360 right_row: impl Row,
361 ) -> Option<StreamChunk> {
362 builder.append_row(op, left_row.chain(right_row))
363 }
364
365 fn match_end(
366 _builder: &mut StreamChunkBuilder,
367 _op: Op,
368 _left_row: impl Row,
369 _right_size: usize,
370 _matched: bool,
371 ) -> Option<StreamChunk> {
372 None
373 }
374 }
375
376 impl Phase1Evaluation for LeftOuter {
377 fn append_matched_row(
378 op: Op,
379 builder: &mut StreamChunkBuilder,
380 left_row: impl Row,
381 right_row: impl Row,
382 ) -> Option<StreamChunk> {
383 builder.append_row(op, left_row.chain(right_row))
384 }
385
386 fn match_end(
387 builder: &mut StreamChunkBuilder,
388 op: Op,
389 left_row: impl Row,
390 right_size: usize,
391 matched: bool,
392 ) -> Option<StreamChunk> {
393 if !matched {
394 builder.append_row(
396 op,
397 left_row.chain(row::repeat_n(DatumRef::None, right_size)),
398 )
399 } else {
400 None
401 }
402 }
403 }
404
405 impl Phase1Evaluation for LeftOuterWithCond {
406 fn append_matched_row(
407 op: Op,
408 builder: &mut StreamChunkBuilder,
409 left_row: impl Row,
410 right_row: impl Row,
411 ) -> Option<StreamChunk> {
412 builder.append_row(op, left_row.chain(right_row))
413 }
414
415 fn match_end(
416 builder: &mut StreamChunkBuilder,
417 op: Op,
418 left_row: impl Row,
419 right_size: usize,
420 _matched: bool,
421 ) -> Option<StreamChunk> {
422 builder.append_row_invisible(
425 op,
426 left_row.chain(row::repeat_n(DatumRef::None, right_size)),
427 )
428 }
429 }
430
431 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
432 #[allow(clippy::too_many_arguments)]
433 pub(super) async fn handle_chunk<
434 'a,
435 K: HashKey,
436 S: StateStore,
437 E: Phase1Evaluation,
438 const APPEND_ONLY: bool,
439 >(
440 chunk_size: usize,
441 right_size: usize,
442 full_schema: Vec<DataType>,
443 epoch: HummockEpoch,
444 left_join_keys: &'a [usize],
445 right_table: &'a mut TemporalSide<K, S>,
446 memo_table_lookup_prefix: &'a [usize],
447 memo_table: &'a mut Option<StateTable<S>>,
448 null_matched: &'a K::Bitmap,
449 chunk: StreamChunk,
450 metrics: &'a TemporalJoinMetrics,
451 ) {
452 let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
453 let keys = K::build_many(left_join_keys, chunk.data_chunk());
454 let to_fetch_keys = chunk
455 .visibility()
456 .iter()
457 .zip_eq_debug(keys.iter())
458 .zip_eq_debug(chunk.ops())
459 .filter_map(|((vis, key), op)| {
460 if vis {
461 if APPEND_ONLY {
462 assert_eq!(*op, Op::Insert);
463 Some(key)
464 } else {
465 match op {
466 Op::Insert | Op::UpdateInsert => Some(key),
467 Op::Delete | Op::UpdateDelete => None,
468 }
469 }
470 } else {
471 None
472 }
473 });
474 right_table
475 .fetch_or_promote_keys(to_fetch_keys, epoch, metrics)
476 .await?;
477
478 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
479 let Some((op, left_row)) = r else {
480 continue;
481 };
482
483 let mut matched = false;
484
485 if APPEND_ONLY {
486 if key.null_bitmap().is_subset(null_matched)
488 && let join_entry = right_table.force_peek(&key)
489 && !join_entry.is_empty()
490 {
491 matched = true;
492 for right_row in join_entry.cached.values() {
493 if let Some(chunk) =
494 E::append_matched_row(op, &mut builder, left_row, right_row)
495 {
496 yield chunk;
497 }
498 }
499 }
500 } else {
501 let memo_table = memo_table.as_mut().unwrap();
515 match op {
516 Op::Insert | Op::UpdateInsert => {
517 if key.null_bitmap().is_subset(null_matched)
518 && let join_entry = right_table.force_peek(&key)
519 && !join_entry.is_empty()
520 {
521 matched = true;
522 for right_row in join_entry.cached.values() {
523 let right_row: OwnedRow = right_row.clone();
524 memo_table.insert(right_row.clone().chain(
526 left_row.project(memo_table_lookup_prefix).into_owned_row(),
527 ));
528 if let Some(chunk) = E::append_matched_row(
529 Op::Insert,
530 &mut builder,
531 left_row,
532 right_row,
533 ) {
534 yield chunk;
535 }
536 }
537 }
538 }
539 Op::Delete | Op::UpdateDelete => {
540 let mut memo_rows_to_delete = vec![];
541 if key.null_bitmap().is_subset(null_matched) {
542 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
543 &(Bound::Unbounded, Bound::Unbounded);
544 let prefix = left_row.project(memo_table_lookup_prefix);
545 let state_table_iter = memo_table
546 .iter_with_prefix(prefix, sub_range, Default::default())
547 .await?;
548 pin_mut!(state_table_iter);
549
550 while let Some(memo_row) = state_table_iter.next().await {
551 matched = true;
552 let memo_row = memo_row?.into_owned_row();
553 memo_rows_to_delete.push(memo_row.clone());
554 if let Some(chunk) = E::append_matched_row(
555 Op::Delete,
556 &mut builder,
557 left_row,
558 memo_row.slice(0..right_size),
559 ) {
560 yield chunk;
561 }
562 }
563 }
564 for memo_row in memo_rows_to_delete {
565 memo_table.delete(memo_row);
567 }
568 }
569 }
570 }
571 if let Some(chunk) = E::match_end(
572 &mut builder,
573 match op {
574 Op::Insert | Op::UpdateInsert => Op::Insert,
575 Op::Delete | Op::UpdateDelete => Op::Delete,
576 },
577 left_row,
578 right_size,
579 matched,
580 ) {
581 yield chunk;
582 }
583 }
584
585 if let Some(chunk) = builder.take() {
586 yield chunk;
587 }
588 }
589}
590
591impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool>
592 TemporalJoinExecutor<K, S, T, APPEND_ONLY>
593{
594 #[allow(clippy::too_many_arguments)]
595 pub fn new(
596 ctx: ActorContextRef,
597 info: ExecutorInfo,
598 left: Executor,
599 right: Executor,
600 table: BatchTable<S>,
601 left_join_keys: Vec<usize>,
602 right_join_keys: Vec<usize>,
603 null_safe: Vec<bool>,
604 condition: Option<NonStrictExpression>,
605 output_indices: Vec<usize>,
606 table_output_indices: Vec<usize>,
607 table_stream_key_indices: Vec<usize>,
608 watermark_sequence: AtomicU64Ref,
609 metrics: Arc<StreamingMetrics>,
610 chunk_size: usize,
611 join_key_data_types: Vec<DataType>,
612 memo_table: Option<StateTable<S>>,
613 ) -> Self {
614 let alloc = StatsAlloc::new(Global).shared();
615
616 let metrics_info = MetricsInfo::new(
617 metrics.clone(),
618 table.table_id().table_id,
619 ctx.id,
620 "temporal join",
621 );
622
623 let cache = ManagedLruCache::unbounded_with_hasher_in(
624 watermark_sequence,
625 metrics_info,
626 DefaultHasher::default(),
627 alloc,
628 );
629
630 let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);
631
632 Self {
633 ctx: ctx.clone(),
634 info,
635 left,
636 right,
637 right_table: TemporalSide {
638 source: table,
639 table_stream_key_indices,
640 table_output_indices,
641 cache,
642 join_key_data_types,
643 },
644 left_join_keys,
645 right_join_keys,
646 null_safe,
647 condition,
648 output_indices,
649 chunk_size,
650 memo_table,
651 metrics,
652 }
653 }
654
655 #[try_stream(ok = Message, error = StreamExecutorError)]
656 async fn into_stream(mut self) {
657 let right_size = self.right.schema().len();
658
659 let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
660 &self.output_indices,
661 self.left.schema().len(),
662 right_size,
663 );
664
665 let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
666
667 let left_stream_key_indices = self.left.pk_indices().to_vec();
668 let right_stream_key_indices = self.right.pk_indices().to_vec();
669 let memo_table_lookup_prefix = self
670 .left_join_keys
671 .iter()
672 .cloned()
673 .chain(left_stream_key_indices)
674 .collect_vec();
675
676 let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
677
678 let mut prev_epoch = None;
679
680 let full_schema: Vec<_> = self
681 .left
682 .schema()
683 .data_types()
684 .into_iter()
685 .chain(self.right.schema().data_types().into_iter())
686 .collect();
687
688 let mut wait_first_barrier = true;
689
690 #[for_await]
691 for msg in align_input::<true>(self.left, self.right) {
692 self.right_table.cache.evict();
693 self.metrics
694 .temporal_join_cached_entry_count
695 .set(self.right_table.cache.len() as i64);
696 match msg? {
697 InternalMessage::WaterMark(watermark) => {
698 let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
699 yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
700 }
701 InternalMessage::Chunk(chunk) => {
702 let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
703
704 let full_schema = full_schema.clone();
705
706 if T == JoinType::Inner {
707 let st1 = phase1::handle_chunk::<K, S, phase1::Inner, APPEND_ONLY>(
708 self.chunk_size,
709 right_size,
710 full_schema,
711 epoch,
712 &self.left_join_keys,
713 &mut self.right_table,
714 &memo_table_lookup_prefix,
715 &mut self.memo_table,
716 &null_matched,
717 chunk,
718 &self.metrics,
719 );
720 #[for_await]
721 for chunk in st1 {
722 let chunk = chunk?;
723 let new_chunk = if let Some(ref cond) = self.condition {
724 let (data_chunk, ops) = chunk.into_parts();
725 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
726 let passed_bitmap =
727 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
728 let (columns, vis) = data_chunk.into_parts();
729 let new_vis = vis & passed_bitmap;
730 StreamChunk::with_visibility(ops, columns, new_vis)
731 } else {
732 chunk
733 };
734 let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
735 yield Message::Chunk(new_chunk);
736 }
737 } else if let Some(ref cond) = self.condition {
738 let st1 =
740 phase1::handle_chunk::<K, S, phase1::LeftOuterWithCond, APPEND_ONLY>(
741 self.chunk_size,
742 right_size,
743 full_schema,
744 epoch,
745 &self.left_join_keys,
746 &mut self.right_table,
747 &memo_table_lookup_prefix,
748 &mut self.memo_table,
749 &null_matched,
750 chunk,
751 &self.metrics,
752 );
753 let mut matched_count = 0usize;
754 #[for_await]
755 for chunk in st1 {
756 let chunk = chunk?;
757 let (data_chunk, ops) = chunk.into_parts();
758 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
759 let passed_bitmap =
760 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
761 let (columns, vis) = data_chunk.into_parts();
762 let mut new_vis = BitmapBuilder::with_capacity(vis.len());
763 for (passed, not_match_end) in
764 passed_bitmap.iter().zip_eq_debug(vis.iter())
765 {
766 let is_match_end = !not_match_end;
767 let vis = if is_match_end && matched_count == 0 {
768 true
770 } else if is_match_end {
771 matched_count = 0;
773 false
775 } else {
776 if passed {
777 matched_count += 1;
778 }
779 passed
780 };
781 new_vis.append(vis);
782 }
783 let new_chunk = apply_indices_map(
784 StreamChunk::with_visibility(ops, columns, new_vis.finish()),
785 &self.output_indices,
786 );
787 yield Message::Chunk(new_chunk);
788 }
789 assert_eq!(matched_count, 0);
791 } else {
792 let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuter, APPEND_ONLY>(
793 self.chunk_size,
794 right_size,
795 full_schema,
796 epoch,
797 &self.left_join_keys,
798 &mut self.right_table,
799 &memo_table_lookup_prefix,
800 &mut self.memo_table,
801 &null_matched,
802 chunk,
803 &self.metrics,
804 );
805 #[for_await]
806 for chunk in st1 {
807 let chunk = chunk?;
808 let new_chunk = apply_indices_map(chunk, &self.output_indices);
809 yield Message::Chunk(new_chunk);
810 }
811 }
812 }
813 InternalMessage::Barrier(updates, barrier) => {
814 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
815 let barrier_epoch = barrier.epoch;
816 if !APPEND_ONLY {
817 if wait_first_barrier {
818 wait_first_barrier = false;
819 yield Message::Barrier(barrier);
820 self.memo_table
821 .as_mut()
822 .unwrap()
823 .init_epoch(barrier_epoch)
824 .await?;
825 } else {
826 let post_commit = self
827 .memo_table
828 .as_mut()
829 .unwrap()
830 .commit(barrier.epoch)
831 .await?;
832 yield Message::Barrier(barrier);
833 post_commit
834 .post_yield_barrier(update_vnode_bitmap.clone())
835 .await?;
836 }
837 } else {
838 yield Message::Barrier(barrier);
839 }
840 if let Some(vnodes) = update_vnode_bitmap {
841 let prev_vnodes =
842 self.right_table.source.update_vnode_bitmap(vnodes.clone());
843 if cache_may_stale(&prev_vnodes, &vnodes) {
844 self.right_table.cache.clear();
845 }
846 }
847 self.right_table.update(
848 updates,
849 &self.right_join_keys,
850 &right_stream_key_indices,
851 )?;
852 prev_epoch = Some(barrier_epoch.curr);
853 }
854 }
855 }
856 }
857}
858
859impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool> Execute
860 for TemporalJoinExecutor<K, S, T, APPEND_ONLY>
861{
862 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
863 self.into_stream().boxed()
864 }
865}