1use std::alloc::Global;
16use std::collections::HashMap;
17use std::collections::hash_map::Entry;
18
19use either::Either;
20use futures::TryStreamExt;
21use futures::stream::{self, PollNext};
22use itertools::Itertools;
23use local_stats_alloc::{SharedStatsAlloc, StatsAlloc};
24use lru::DefaultHasher;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::BitmapBuilder;
27use risingwave_common::hash::{HashKey, NullBitmap};
28use risingwave_common::row::RowExt;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_common_estimate_size::{EstimateSize, KvSize};
31use risingwave_expr::expr::NonStrictExpression;
32use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::TableIter;
35use risingwave_storage::table::batch_table::BatchTable;
36
37use super::join::{JoinType, JoinTypePrimitive};
38use super::monitor::TemporalJoinMetrics;
39use crate::cache::{ManagedLruCache, cache_may_stale};
40use crate::common::metrics::MetricsInfo;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::prelude::*;
43
44pub struct TemporalJoinExecutor<
45 K: HashKey,
46 S: StateStore,
47 const T: JoinTypePrimitive,
48 const APPEND_ONLY: bool,
49> {
50 ctx: ActorContextRef,
51 #[allow(dead_code)]
52 info: ExecutorInfo,
53 left: Executor,
54 right: Executor,
55 right_table: TemporalSide<K, S>,
56 left_join_keys: Vec<usize>,
57 right_join_keys: Vec<usize>,
58 null_safe: Vec<bool>,
59 condition: Option<NonStrictExpression>,
60 output_indices: Vec<usize>,
61 chunk_size: usize,
62 memo_table: Option<StateTable<S>>,
63 metrics: TemporalJoinMetrics,
64}
65
66#[derive(Default)]
67pub struct JoinEntry {
68 cached: HashMap<OwnedRow, OwnedRow>,
70 kv_heap_size: KvSize,
71}
72
73impl EstimateSize for JoinEntry {
74 fn estimated_heap_size(&self) -> usize {
75 self.kv_heap_size.size()
78 }
79}
80
81impl JoinEntry {
82 pub fn insert(&mut self, key: OwnedRow, value: OwnedRow) {
84 if let Entry::Vacant(e) = self.cached.entry(key) {
87 self.kv_heap_size.add(e.key(), &value);
88 e.insert(value);
89 } else {
90 panic!("value {:?} double insert", value);
91 }
92 }
93
94 pub fn remove(&mut self, key: &OwnedRow) {
96 if let Some(value) = self.cached.remove(key) {
97 self.kv_heap_size.sub(key, &value);
98 } else {
99 panic!("key {:?} should be in the cache", key);
100 }
101 }
102
103 pub fn is_empty(&self) -> bool {
104 self.cached.is_empty()
105 }
106}
107
108struct TemporalSide<K: HashKey, S: StateStore> {
109 source: BatchTable<S>,
110 table_stream_key_indices: Vec<usize>,
111 table_output_indices: Vec<usize>,
112 cache: ManagedLruCache<K, JoinEntry, DefaultHasher, SharedStatsAlloc<Global>>,
113 join_key_data_types: Vec<DataType>,
114}
115
116impl<K: HashKey, S: StateStore> TemporalSide<K, S> {
117 async fn fetch_or_promote_keys(
120 &mut self,
121 keys: impl Iterator<Item = &K>,
122 epoch: HummockEpoch,
123 metrics: &TemporalJoinMetrics,
124 ) -> StreamExecutorResult<()> {
125 let mut futs = Vec::with_capacity(keys.size_hint().1.unwrap_or(0));
126 for key in keys {
127 metrics.temporal_join_total_query_cache_count.inc();
128
129 if self.cache.get(key).is_none() {
130 metrics.temporal_join_cache_miss_count.inc();
131
132 futs.push(async {
133 let pk_prefix = key.deserialize(&self.join_key_data_types)?;
134
135 let iter = self
136 .source
137 .batch_iter_with_pk_bounds(
138 HummockReadEpoch::NoWait(epoch),
139 &pk_prefix,
140 ..,
141 false,
142 PrefetchOptions::default(),
143 )
144 .await?;
145
146 let mut entry = JoinEntry::default();
147
148 pin_mut!(iter);
149 while let Some(row) = iter.next_row().await? {
150 entry.insert(
151 row.as_ref()
152 .project(&self.table_stream_key_indices)
153 .into_owned_row(),
154 row.project(&self.table_output_indices).into_owned_row(),
155 );
156 }
157 let key = key.clone();
158 Ok((key, entry)) as StreamExecutorResult<_>
159 });
160 }
161 }
162
163 #[for_await]
164 for res in stream::iter(futs).buffered(16) {
165 let (key, entry) = res?;
166 self.cache.put(key, entry);
167 }
168
169 Ok(())
170 }
171
172 fn force_peek(&self, key: &K) -> &JoinEntry {
173 self.cache.peek(key).expect("key should exists")
174 }
175
176 fn update(
177 &mut self,
178 chunks: Vec<StreamChunk>,
179 join_keys: &[usize],
180 right_stream_key_indices: &[usize],
181 ) -> StreamExecutorResult<()> {
182 for chunk in chunks {
183 let keys = K::build_many(join_keys, chunk.data_chunk());
184 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
185 let Some((op, row)) = r else {
186 continue;
187 };
188 if self.cache.contains(&key) {
189 let mut entry = self.cache.get_mut(&key).unwrap();
191 let stream_key = row.project(right_stream_key_indices).into_owned_row();
192 match op {
193 Op::Insert | Op::UpdateInsert => {
194 entry.insert(stream_key, row.into_owned_row())
195 }
196 Op::Delete | Op::UpdateDelete => entry.remove(&stream_key),
197 };
198 }
199 }
200 }
201 Ok(())
202 }
203}
204
205pub(super) enum InternalMessage {
206 Chunk(StreamChunk),
207 Barrier(Vec<StreamChunk>, Barrier),
208 WaterMark(Watermark),
209}
210
211#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
212async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
213 #[for_await]
214 for item in stream {
215 match item? {
216 Message::Watermark(_) => {
217 }
219 Message::Chunk(c) => yield c,
220 Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
221 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
222 }
223 Message::Barrier(_) => return Ok(()),
224 }
225 }
226}
227
228#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
229async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
230 #[for_await]
231 for item in stream {
232 match item? {
233 Message::Watermark(w) => {
234 yield InternalMessage::WaterMark(w);
235 }
236 Message::Chunk(c) => yield InternalMessage::Chunk(c),
237 Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
238 return Err(StreamExecutorError::align_barrier(expected_barrier, b));
239 }
240 Message::Barrier(_) => return Ok(()),
241 }
242 }
243}
244
245#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
250pub(super) async fn align_input<const YIELD_RIGHT_CHUNKS: bool>(left: Executor, right: Executor) {
251 let mut left = pin!(left.execute());
252 let mut right = pin!(right.execute());
253 loop {
255 let mut right_chunks = vec![];
256 'inner: loop {
258 let mut combined = stream::select_with_strategy(
259 left.by_ref().map(Either::Left),
260 right.by_ref().map(Either::Right),
261 |_: &mut ()| PollNext::Left,
262 );
263 match combined.next().await {
264 Some(Either::Left(Ok(Message::Chunk(c)))) => yield InternalMessage::Chunk(c),
265 Some(Either::Right(Ok(Message::Chunk(c)))) => {
266 if YIELD_RIGHT_CHUNKS {
267 right_chunks.push(c);
268 }
269 }
270 Some(Either::Left(Ok(Message::Barrier(b)))) => {
271 let mut remain = chunks_until_barrier(right.by_ref(), b.clone())
272 .try_collect()
273 .await?;
274 if YIELD_RIGHT_CHUNKS {
275 right_chunks.append(&mut remain);
276 }
277 yield InternalMessage::Barrier(right_chunks, b);
278 break 'inner;
279 }
280 Some(Either::Right(Ok(Message::Barrier(b)))) => {
281 #[for_await]
282 for internal_message in
283 internal_messages_until_barrier(left.by_ref(), b.clone())
284 {
285 yield internal_message?;
286 }
287 yield InternalMessage::Barrier(right_chunks, b);
288 break 'inner;
289 }
290 Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e),
291 Some(Either::Left(Ok(Message::Watermark(w)))) => {
292 yield InternalMessage::WaterMark(w);
293 }
294 Some(Either::Right(Ok(Message::Watermark(_)))) => {
295 }
297 None => return Ok(()),
298 }
299 }
300 }
301}
302
303pub(super) fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk {
304 let (data_chunk, ops) = chunk.into_parts();
305 let (columns, vis) = data_chunk.into_parts();
306 let output_columns = indices
307 .iter()
308 .cloned()
309 .map(|idx| columns[idx].clone())
310 .collect();
311 StreamChunk::with_visibility(ops, output_columns, vis)
312}
313
314pub(super) mod phase1 {
315 use std::ops::Bound;
316
317 use futures::{StreamExt, pin_mut};
318 use futures_async_stream::try_stream;
319 use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
320 use risingwave_common::array::{Op, StreamChunk};
321 use risingwave_common::hash::{HashKey, NullBitmap};
322 use risingwave_common::row::{self, OwnedRow, Row, RowExt};
323 use risingwave_common::types::{DataType, DatumRef};
324 use risingwave_common::util::iter_util::ZipEqDebug;
325 use risingwave_hummock_sdk::HummockEpoch;
326 use risingwave_storage::StateStore;
327
328 use super::{StreamExecutorError, TemporalSide};
329 use crate::common::table::state_table::StateTable;
330 use crate::executor::monitor::TemporalJoinMetrics;
331
332 pub trait Phase1Evaluation {
333 #[must_use = "consume chunk if produced"]
335 fn append_matched_row(
336 op: Op,
337 builder: &mut StreamChunkBuilder,
338 left_row: impl Row,
339 right_row: impl Row,
340 ) -> Option<StreamChunk>;
341
342 #[must_use = "consume chunk if produced"]
344 fn match_end(
345 builder: &mut StreamChunkBuilder,
346 op: Op,
347 left_row: impl Row,
348 right_size: usize,
349 matched: bool,
350 ) -> Option<StreamChunk>;
351 }
352
353 pub struct Inner;
354 pub struct LeftOuter;
355 pub struct LeftOuterWithCond;
356
357 impl Phase1Evaluation for Inner {
358 fn append_matched_row(
359 op: Op,
360 builder: &mut StreamChunkBuilder,
361 left_row: impl Row,
362 right_row: impl Row,
363 ) -> Option<StreamChunk> {
364 builder.append_row(op, left_row.chain(right_row))
365 }
366
367 fn match_end(
368 _builder: &mut StreamChunkBuilder,
369 _op: Op,
370 _left_row: impl Row,
371 _right_size: usize,
372 _matched: bool,
373 ) -> Option<StreamChunk> {
374 None
375 }
376 }
377
378 impl Phase1Evaluation for LeftOuter {
379 fn append_matched_row(
380 op: Op,
381 builder: &mut StreamChunkBuilder,
382 left_row: impl Row,
383 right_row: impl Row,
384 ) -> Option<StreamChunk> {
385 builder.append_row(op, left_row.chain(right_row))
386 }
387
388 fn match_end(
389 builder: &mut StreamChunkBuilder,
390 op: Op,
391 left_row: impl Row,
392 right_size: usize,
393 matched: bool,
394 ) -> Option<StreamChunk> {
395 if !matched {
396 builder.append_row(
398 op,
399 left_row.chain(row::repeat_n(DatumRef::None, right_size)),
400 )
401 } else {
402 None
403 }
404 }
405 }
406
407 impl Phase1Evaluation for LeftOuterWithCond {
408 fn append_matched_row(
409 op: Op,
410 builder: &mut StreamChunkBuilder,
411 left_row: impl Row,
412 right_row: impl Row,
413 ) -> Option<StreamChunk> {
414 builder.append_row(op, left_row.chain(right_row))
415 }
416
417 fn match_end(
418 builder: &mut StreamChunkBuilder,
419 op: Op,
420 left_row: impl Row,
421 right_size: usize,
422 _matched: bool,
423 ) -> Option<StreamChunk> {
424 builder.append_row_invisible(
427 op,
428 left_row.chain(row::repeat_n(DatumRef::None, right_size)),
429 )
430 }
431 }
432
433 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
434 #[allow(clippy::too_many_arguments)]
435 pub(super) async fn handle_chunk<
436 'a,
437 K: HashKey,
438 S: StateStore,
439 E: Phase1Evaluation,
440 const APPEND_ONLY: bool,
441 >(
442 chunk_size: usize,
443 right_size: usize,
444 full_schema: Vec<DataType>,
445 epoch: HummockEpoch,
446 left_join_keys: &'a [usize],
447 right_table: &'a mut TemporalSide<K, S>,
448 memo_table_lookup_prefix: &'a [usize],
449 memo_table: &'a mut Option<StateTable<S>>,
450 null_matched: &'a K::Bitmap,
451 chunk: StreamChunk,
452 metrics: &'a TemporalJoinMetrics,
453 ) {
454 let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
455 let keys = K::build_many(left_join_keys, chunk.data_chunk());
456 let to_fetch_keys = chunk
457 .visibility()
458 .iter()
459 .zip_eq_debug(keys.iter())
460 .zip_eq_debug(chunk.ops())
461 .filter_map(|((vis, key), op)| {
462 if vis {
463 if APPEND_ONLY {
464 assert_eq!(*op, Op::Insert);
465 Some(key)
466 } else {
467 match op {
468 Op::Insert | Op::UpdateInsert => Some(key),
469 Op::Delete | Op::UpdateDelete => None,
470 }
471 }
472 } else {
473 None
474 }
475 });
476 right_table
477 .fetch_or_promote_keys(to_fetch_keys, epoch, metrics)
478 .await?;
479
480 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
481 let Some((op, left_row)) = r else {
482 continue;
483 };
484
485 let mut matched = false;
486
487 if APPEND_ONLY {
488 if key.null_bitmap().is_subset(null_matched)
490 && let join_entry = right_table.force_peek(&key)
491 && !join_entry.is_empty()
492 {
493 matched = true;
494 for right_row in join_entry.cached.values() {
495 if let Some(chunk) =
496 E::append_matched_row(op, &mut builder, left_row, right_row)
497 {
498 yield chunk;
499 }
500 }
501 }
502 } else {
503 let memo_table = memo_table.as_mut().unwrap();
517 match op {
518 Op::Insert | Op::UpdateInsert => {
519 if key.null_bitmap().is_subset(null_matched)
520 && let join_entry = right_table.force_peek(&key)
521 && !join_entry.is_empty()
522 {
523 matched = true;
524 for right_row in join_entry.cached.values() {
525 let right_row: OwnedRow = right_row.clone();
526 memo_table.insert(right_row.clone().chain(
528 left_row.project(memo_table_lookup_prefix).into_owned_row(),
529 ));
530 if let Some(chunk) = E::append_matched_row(
531 Op::Insert,
532 &mut builder,
533 left_row,
534 right_row,
535 ) {
536 yield chunk;
537 }
538 }
539 }
540 }
541 Op::Delete | Op::UpdateDelete => {
542 let mut memo_rows_to_delete = vec![];
543 if key.null_bitmap().is_subset(null_matched) {
544 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
545 &(Bound::Unbounded, Bound::Unbounded);
546 let prefix = left_row.project(memo_table_lookup_prefix);
547 let state_table_iter = memo_table
548 .iter_with_prefix(prefix, sub_range, Default::default())
549 .await?;
550 pin_mut!(state_table_iter);
551
552 while let Some(memo_row) = state_table_iter.next().await {
553 matched = true;
554 let memo_row = memo_row?.into_owned_row();
555 memo_rows_to_delete.push(memo_row.clone());
556 if let Some(chunk) = E::append_matched_row(
557 Op::Delete,
558 &mut builder,
559 left_row,
560 memo_row.slice(0..right_size),
561 ) {
562 yield chunk;
563 }
564 }
565 }
566 for memo_row in memo_rows_to_delete {
567 memo_table.delete(memo_row);
569 }
570 }
571 }
572 }
573 if let Some(chunk) = E::match_end(
574 &mut builder,
575 match op {
576 Op::Insert | Op::UpdateInsert => Op::Insert,
577 Op::Delete | Op::UpdateDelete => Op::Delete,
578 },
579 left_row,
580 right_size,
581 matched,
582 ) {
583 yield chunk;
584 }
585 }
586
587 if let Some(chunk) = builder.take() {
588 yield chunk;
589 }
590 }
591}
592
593impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool>
594 TemporalJoinExecutor<K, S, T, APPEND_ONLY>
595{
596 #[allow(clippy::too_many_arguments)]
597 pub fn new(
598 ctx: ActorContextRef,
599 info: ExecutorInfo,
600 left: Executor,
601 right: Executor,
602 table: BatchTable<S>,
603 left_join_keys: Vec<usize>,
604 right_join_keys: Vec<usize>,
605 null_safe: Vec<bool>,
606 condition: Option<NonStrictExpression>,
607 output_indices: Vec<usize>,
608 table_output_indices: Vec<usize>,
609 table_stream_key_indices: Vec<usize>,
610 watermark_sequence: AtomicU64Ref,
611 metrics: Arc<StreamingMetrics>,
612 chunk_size: usize,
613 join_key_data_types: Vec<DataType>,
614 memo_table: Option<StateTable<S>>,
615 ) -> Self {
616 let alloc = StatsAlloc::new(Global).shared();
617
618 let metrics_info = MetricsInfo::new(
619 metrics.clone(),
620 table.table_id().table_id,
621 ctx.id,
622 "temporal join",
623 );
624
625 let cache = ManagedLruCache::unbounded_with_hasher_in(
626 watermark_sequence,
627 metrics_info,
628 DefaultHasher::default(),
629 alloc,
630 );
631
632 let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);
633
634 Self {
635 ctx: ctx.clone(),
636 info,
637 left,
638 right,
639 right_table: TemporalSide {
640 source: table,
641 table_stream_key_indices,
642 table_output_indices,
643 cache,
644 join_key_data_types,
645 },
646 left_join_keys,
647 right_join_keys,
648 null_safe,
649 condition,
650 output_indices,
651 chunk_size,
652 memo_table,
653 metrics,
654 }
655 }
656
657 #[try_stream(ok = Message, error = StreamExecutorError)]
658 async fn into_stream(mut self) {
659 let right_size = self.right.schema().len();
660
661 let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
662 &self.output_indices,
663 self.left.schema().len(),
664 right_size,
665 );
666
667 let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
668
669 let left_stream_key_indices = self.left.pk_indices().to_vec();
670 let right_stream_key_indices = self.right.pk_indices().to_vec();
671 let memo_table_lookup_prefix = self
672 .left_join_keys
673 .iter()
674 .cloned()
675 .chain(left_stream_key_indices)
676 .collect_vec();
677
678 let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
679
680 let mut prev_epoch = None;
681
682 let full_schema: Vec<_> = self
683 .left
684 .schema()
685 .data_types()
686 .into_iter()
687 .chain(self.right.schema().data_types().into_iter())
688 .collect();
689
690 let mut wait_first_barrier = true;
691
692 #[for_await]
693 for msg in align_input::<true>(self.left, self.right) {
694 self.right_table.cache.evict();
695 self.metrics
696 .temporal_join_cached_entry_count
697 .set(self.right_table.cache.len() as i64);
698 match msg? {
699 InternalMessage::WaterMark(watermark) => {
700 let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
701 yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
702 }
703 InternalMessage::Chunk(chunk) => {
704 let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
705
706 let full_schema = full_schema.clone();
707
708 if T == JoinType::Inner {
709 let st1 = phase1::handle_chunk::<K, S, phase1::Inner, APPEND_ONLY>(
710 self.chunk_size,
711 right_size,
712 full_schema,
713 epoch,
714 &self.left_join_keys,
715 &mut self.right_table,
716 &memo_table_lookup_prefix,
717 &mut self.memo_table,
718 &null_matched,
719 chunk,
720 &self.metrics,
721 );
722 #[for_await]
723 for chunk in st1 {
724 let chunk = chunk?;
725 let new_chunk = if let Some(ref cond) = self.condition {
726 let (data_chunk, ops) = chunk.into_parts();
727 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
728 let passed_bitmap =
729 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
730 let (columns, vis) = data_chunk.into_parts();
731 let new_vis = vis & passed_bitmap;
732 StreamChunk::with_visibility(ops, columns, new_vis)
733 } else {
734 chunk
735 };
736 let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
737 yield Message::Chunk(new_chunk);
738 }
739 } else if let Some(ref cond) = self.condition {
740 let st1 =
742 phase1::handle_chunk::<K, S, phase1::LeftOuterWithCond, APPEND_ONLY>(
743 self.chunk_size,
744 right_size,
745 full_schema,
746 epoch,
747 &self.left_join_keys,
748 &mut self.right_table,
749 &memo_table_lookup_prefix,
750 &mut self.memo_table,
751 &null_matched,
752 chunk,
753 &self.metrics,
754 );
755 let mut matched_count = 0usize;
756 #[for_await]
757 for chunk in st1 {
758 let chunk = chunk?;
759 let (data_chunk, ops) = chunk.into_parts();
760 let passed_bitmap = cond.eval_infallible(&data_chunk).await;
761 let passed_bitmap =
762 Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
763 let (columns, vis) = data_chunk.into_parts();
764 let mut new_vis = BitmapBuilder::with_capacity(vis.len());
765 for (passed, not_match_end) in
766 passed_bitmap.iter().zip_eq_debug(vis.iter())
767 {
768 let is_match_end = !not_match_end;
769 let vis = if is_match_end && matched_count == 0 {
770 true
772 } else if is_match_end {
773 matched_count = 0;
775 false
777 } else {
778 if passed {
779 matched_count += 1;
780 }
781 passed
782 };
783 new_vis.append(vis);
784 }
785 let new_chunk = apply_indices_map(
786 StreamChunk::with_visibility(ops, columns, new_vis.finish()),
787 &self.output_indices,
788 );
789 yield Message::Chunk(new_chunk);
790 }
791 assert_eq!(matched_count, 0);
793 } else {
794 let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuter, APPEND_ONLY>(
795 self.chunk_size,
796 right_size,
797 full_schema,
798 epoch,
799 &self.left_join_keys,
800 &mut self.right_table,
801 &memo_table_lookup_prefix,
802 &mut self.memo_table,
803 &null_matched,
804 chunk,
805 &self.metrics,
806 );
807 #[for_await]
808 for chunk in st1 {
809 let chunk = chunk?;
810 let new_chunk = apply_indices_map(chunk, &self.output_indices);
811 yield Message::Chunk(new_chunk);
812 }
813 }
814 }
815 InternalMessage::Barrier(updates, barrier) => {
816 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
817 let barrier_epoch = barrier.epoch;
818 if !APPEND_ONLY {
819 if wait_first_barrier {
820 wait_first_barrier = false;
821 yield Message::Barrier(barrier);
822 self.memo_table
823 .as_mut()
824 .unwrap()
825 .init_epoch(barrier_epoch)
826 .await?;
827 } else {
828 let post_commit = self
829 .memo_table
830 .as_mut()
831 .unwrap()
832 .commit(barrier.epoch)
833 .await?;
834 yield Message::Barrier(barrier);
835 post_commit
836 .post_yield_barrier(update_vnode_bitmap.clone())
837 .await?;
838 }
839 } else {
840 yield Message::Barrier(barrier);
841 }
842 if let Some(vnodes) = update_vnode_bitmap {
843 let prev_vnodes =
844 self.right_table.source.update_vnode_bitmap(vnodes.clone());
845 if cache_may_stale(&prev_vnodes, &vnodes) {
846 self.right_table.cache.clear();
847 }
848 }
849 self.right_table.update(
850 updates,
851 &self.right_join_keys,
852 &right_stream_key_indices,
853 )?;
854 prev_epoch = Some(barrier_epoch.prev);
855 }
856 }
857 }
858 }
859}
860
861impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool> Execute
862 for TemporalJoinExecutor<K, S, T, APPEND_ONLY>
863{
864 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
865 self.into_stream().boxed()
866 }
867}