risingwave_stream/executor/
temporal_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::alloc::Global;
16use std::collections::HashMap;
17use std::collections::hash_map::Entry;
18
19use either::Either;
20use futures::TryStreamExt;
21use futures::stream::{self, PollNext};
22use itertools::Itertools;
23use local_stats_alloc::{SharedStatsAlloc, StatsAlloc};
24use lru::DefaultHasher;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::BitmapBuilder;
27use risingwave_common::hash::{HashKey, NullBitmap};
28use risingwave_common::row::RowExt;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_common_estimate_size::{EstimateSize, KvSize};
31use risingwave_expr::expr::NonStrictExpression;
32use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::TableIter;
35use risingwave_storage::table::batch_table::BatchTable;
36
37use super::join::{JoinType, JoinTypePrimitive};
38use super::monitor::TemporalJoinMetrics;
39use crate::cache::{ManagedLruCache, cache_may_stale};
40use crate::common::metrics::MetricsInfo;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::prelude::*;
43
44pub struct TemporalJoinExecutor<
45    K: HashKey,
46    S: StateStore,
47    const T: JoinTypePrimitive,
48    const APPEND_ONLY: bool,
49> {
50    ctx: ActorContextRef,
51    #[allow(dead_code)]
52    info: ExecutorInfo,
53    left: Executor,
54    right: Executor,
55    right_table: TemporalSide<K, S>,
56    left_join_keys: Vec<usize>,
57    right_join_keys: Vec<usize>,
58    null_safe: Vec<bool>,
59    condition: Option<NonStrictExpression>,
60    output_indices: Vec<usize>,
61    chunk_size: usize,
62    memo_table: Option<StateTable<S>>,
63    metrics: TemporalJoinMetrics,
64}
65
66#[derive(Default)]
67pub struct JoinEntry {
68    /// pk -> row
69    cached: HashMap<OwnedRow, OwnedRow>,
70    kv_heap_size: KvSize,
71}
72
73impl EstimateSize for JoinEntry {
74    fn estimated_heap_size(&self) -> usize {
75        // TODO: Add internal size.
76        // https://github.com/risingwavelabs/risingwave/issues/9713
77        self.kv_heap_size.size()
78    }
79}
80
81impl JoinEntry {
82    /// Insert into the cache.
83    pub fn insert(&mut self, key: OwnedRow, value: OwnedRow) {
84        // Lookup might refill the cache before the `insert` messages from the temporal side
85        // upstream.
86        if let Entry::Vacant(e) = self.cached.entry(key) {
87            self.kv_heap_size.add(e.key(), &value);
88            e.insert(value);
89        }
90    }
91
92    /// Delete from the cache.
93    pub fn remove(&mut self, key: &OwnedRow) {
94        if let Some(value) = self.cached.remove(key) {
95            self.kv_heap_size.sub(key, &value);
96        } else {
97            panic!("key {:?} should be in the cache", key);
98        }
99    }
100
101    pub fn is_empty(&self) -> bool {
102        self.cached.is_empty()
103    }
104}
105
106struct TemporalSide<K: HashKey, S: StateStore> {
107    source: BatchTable<S>,
108    table_stream_key_indices: Vec<usize>,
109    table_output_indices: Vec<usize>,
110    cache: ManagedLruCache<K, JoinEntry, DefaultHasher, SharedStatsAlloc<Global>>,
111    join_key_data_types: Vec<DataType>,
112}
113
114impl<K: HashKey, S: StateStore> TemporalSide<K, S> {
115    /// Fetch records from temporal side table and ensure the entry in the cache.
116    /// If already exists, the entry will be promoted.
117    async fn fetch_or_promote_keys(
118        &mut self,
119        keys: impl Iterator<Item = &K>,
120        epoch: HummockEpoch,
121        metrics: &TemporalJoinMetrics,
122    ) -> StreamExecutorResult<()> {
123        let mut futs = Vec::with_capacity(keys.size_hint().1.unwrap_or(0));
124        for key in keys {
125            metrics.temporal_join_total_query_cache_count.inc();
126
127            if self.cache.get(key).is_none() {
128                metrics.temporal_join_cache_miss_count.inc();
129
130                futs.push(async {
131                    let pk_prefix = key.deserialize(&self.join_key_data_types)?;
132
133                    let iter = self
134                        .source
135                        .batch_iter_with_pk_bounds(
136                            HummockReadEpoch::NoWait(epoch),
137                            &pk_prefix,
138                            ..,
139                            false,
140                            PrefetchOptions::default(),
141                        )
142                        .await?;
143
144                    let mut entry = JoinEntry::default();
145
146                    pin_mut!(iter);
147                    while let Some(row) = iter.next_row().await? {
148                        entry.insert(
149                            row.as_ref()
150                                .project(&self.table_stream_key_indices)
151                                .into_owned_row(),
152                            row.project(&self.table_output_indices).into_owned_row(),
153                        );
154                    }
155                    let key = key.clone();
156                    Ok((key, entry)) as StreamExecutorResult<_>
157                });
158            }
159        }
160
161        #[for_await]
162        for res in stream::iter(futs).buffered(16) {
163            let (key, entry) = res?;
164            self.cache.put(key, entry);
165        }
166
167        Ok(())
168    }
169
170    fn force_peek(&self, key: &K) -> &JoinEntry {
171        self.cache.peek(key).expect("key should exists")
172    }
173
174    fn update(
175        &mut self,
176        chunks: Vec<StreamChunk>,
177        join_keys: &[usize],
178        right_stream_key_indices: &[usize],
179    ) -> StreamExecutorResult<()> {
180        for chunk in chunks {
181            let keys = K::build_many(join_keys, chunk.data_chunk());
182            for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
183                let Some((op, row)) = r else {
184                    continue;
185                };
186                if self.cache.contains(&key) {
187                    // Update cache
188                    let mut entry = self.cache.get_mut(&key).unwrap();
189                    let stream_key = row.project(right_stream_key_indices).into_owned_row();
190                    match op {
191                        Op::Insert | Op::UpdateInsert => {
192                            entry.insert(stream_key, row.into_owned_row())
193                        }
194                        Op::Delete | Op::UpdateDelete => entry.remove(&stream_key),
195                    };
196                }
197            }
198        }
199        Ok(())
200    }
201}
202
203pub(super) enum InternalMessage {
204    Chunk(StreamChunk),
205    Barrier(Vec<StreamChunk>, Barrier),
206    WaterMark(Watermark),
207}
208
209#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
210async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
211    #[for_await]
212    for item in stream {
213        match item? {
214            Message::Watermark(_) => {
215                // ignore
216            }
217            Message::Chunk(c) => yield c,
218            Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
219                return Err(StreamExecutorError::align_barrier(expected_barrier, b));
220            }
221            Message::Barrier(_) => return Ok(()),
222        }
223    }
224}
225
226#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
227async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
228    #[for_await]
229    for item in stream {
230        match item? {
231            Message::Watermark(w) => {
232                yield InternalMessage::WaterMark(w);
233            }
234            Message::Chunk(c) => yield InternalMessage::Chunk(c),
235            Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
236                return Err(StreamExecutorError::align_barrier(expected_barrier, b));
237            }
238            Message::Barrier(_) => return Ok(()),
239        }
240    }
241}
242
243// Align the left and right inputs according to their barriers,
244// such that in the produced stream, an aligned interval starts with
245// any number of `InternalMessage::Chunk(left_chunk)` and followed by
246// `InternalMessage::Barrier(right_chunks, barrier)`.
247#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
248pub(super) async fn align_input<const YIELD_RIGHT_CHUNKS: bool>(left: Executor, right: Executor) {
249    let mut left = pin!(left.execute());
250    let mut right = pin!(right.execute());
251    // Keep producing intervals until stream exhaustion or errors.
252    loop {
253        let mut right_chunks = vec![];
254        // Produce an aligned interval.
255        'inner: loop {
256            let mut combined = stream::select_with_strategy(
257                left.by_ref().map(Either::Left),
258                right.by_ref().map(Either::Right),
259                |_: &mut ()| PollNext::Left,
260            );
261            match combined.next().await {
262                Some(Either::Left(Ok(Message::Chunk(c)))) => yield InternalMessage::Chunk(c),
263                Some(Either::Right(Ok(Message::Chunk(c)))) => {
264                    if YIELD_RIGHT_CHUNKS {
265                        right_chunks.push(c);
266                    }
267                }
268                Some(Either::Left(Ok(Message::Barrier(b)))) => {
269                    let mut remain = chunks_until_barrier(right.by_ref(), b.clone())
270                        .try_collect()
271                        .await?;
272                    if YIELD_RIGHT_CHUNKS {
273                        right_chunks.append(&mut remain);
274                    }
275                    yield InternalMessage::Barrier(right_chunks, b);
276                    break 'inner;
277                }
278                Some(Either::Right(Ok(Message::Barrier(b)))) => {
279                    #[for_await]
280                    for internal_message in
281                        internal_messages_until_barrier(left.by_ref(), b.clone())
282                    {
283                        yield internal_message?;
284                    }
285                    yield InternalMessage::Barrier(right_chunks, b);
286                    break 'inner;
287                }
288                Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e),
289                Some(Either::Left(Ok(Message::Watermark(w)))) => {
290                    yield InternalMessage::WaterMark(w);
291                }
292                Some(Either::Right(Ok(Message::Watermark(_)))) => {
293                    // ignore right side watermark
294                }
295                None => return Ok(()),
296            }
297        }
298    }
299}
300
301pub(super) fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk {
302    let (data_chunk, ops) = chunk.into_parts();
303    let (columns, vis) = data_chunk.into_parts();
304    let output_columns = indices
305        .iter()
306        .cloned()
307        .map(|idx| columns[idx].clone())
308        .collect();
309    StreamChunk::with_visibility(ops, output_columns, vis)
310}
311
312pub(super) mod phase1 {
313    use std::ops::Bound;
314
315    use futures::{StreamExt, pin_mut};
316    use futures_async_stream::try_stream;
317    use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
318    use risingwave_common::array::{Op, StreamChunk};
319    use risingwave_common::hash::{HashKey, NullBitmap};
320    use risingwave_common::row::{self, OwnedRow, Row, RowExt};
321    use risingwave_common::types::{DataType, DatumRef};
322    use risingwave_common::util::iter_util::ZipEqDebug;
323    use risingwave_hummock_sdk::HummockEpoch;
324    use risingwave_storage::StateStore;
325
326    use super::{StreamExecutorError, TemporalSide};
327    use crate::common::table::state_table::StateTable;
328    use crate::executor::monitor::TemporalJoinMetrics;
329
330    pub trait Phase1Evaluation {
331        /// Called when a matched row is found.
332        #[must_use = "consume chunk if produced"]
333        fn append_matched_row(
334            op: Op,
335            builder: &mut StreamChunkBuilder,
336            left_row: impl Row,
337            right_row: impl Row,
338        ) -> Option<StreamChunk>;
339
340        /// Called when all matched rows of a join key are appended.
341        #[must_use = "consume chunk if produced"]
342        fn match_end(
343            builder: &mut StreamChunkBuilder,
344            op: Op,
345            left_row: impl Row,
346            right_size: usize,
347            matched: bool,
348        ) -> Option<StreamChunk>;
349    }
350
351    pub struct Inner;
352    pub struct LeftOuter;
353    pub struct LeftOuterWithCond;
354
355    impl Phase1Evaluation for Inner {
356        fn append_matched_row(
357            op: Op,
358            builder: &mut StreamChunkBuilder,
359            left_row: impl Row,
360            right_row: impl Row,
361        ) -> Option<StreamChunk> {
362            builder.append_row(op, left_row.chain(right_row))
363        }
364
365        fn match_end(
366            _builder: &mut StreamChunkBuilder,
367            _op: Op,
368            _left_row: impl Row,
369            _right_size: usize,
370            _matched: bool,
371        ) -> Option<StreamChunk> {
372            None
373        }
374    }
375
376    impl Phase1Evaluation for LeftOuter {
377        fn append_matched_row(
378            op: Op,
379            builder: &mut StreamChunkBuilder,
380            left_row: impl Row,
381            right_row: impl Row,
382        ) -> Option<StreamChunk> {
383            builder.append_row(op, left_row.chain(right_row))
384        }
385
386        fn match_end(
387            builder: &mut StreamChunkBuilder,
388            op: Op,
389            left_row: impl Row,
390            right_size: usize,
391            matched: bool,
392        ) -> Option<StreamChunk> {
393            if !matched {
394                // If no rows matched, a marker row should be inserted.
395                builder.append_row(
396                    op,
397                    left_row.chain(row::repeat_n(DatumRef::None, right_size)),
398                )
399            } else {
400                None
401            }
402        }
403    }
404
405    impl Phase1Evaluation for LeftOuterWithCond {
406        fn append_matched_row(
407            op: Op,
408            builder: &mut StreamChunkBuilder,
409            left_row: impl Row,
410            right_row: impl Row,
411        ) -> Option<StreamChunk> {
412            builder.append_row(op, left_row.chain(right_row))
413        }
414
415        fn match_end(
416            builder: &mut StreamChunkBuilder,
417            op: Op,
418            left_row: impl Row,
419            right_size: usize,
420            _matched: bool,
421        ) -> Option<StreamChunk> {
422            // A marker row should always be inserted and mark as invisible for non-lookup filters evaluation.
423            // The row will be converted to visible in the further steps if no rows matched after all filters evaluated.
424            builder.append_row_invisible(
425                op,
426                left_row.chain(row::repeat_n(DatumRef::None, right_size)),
427            )
428        }
429    }
430
431    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
432    #[allow(clippy::too_many_arguments)]
433    pub(super) async fn handle_chunk<
434        'a,
435        K: HashKey,
436        S: StateStore,
437        E: Phase1Evaluation,
438        const APPEND_ONLY: bool,
439    >(
440        chunk_size: usize,
441        right_size: usize,
442        full_schema: Vec<DataType>,
443        epoch: HummockEpoch,
444        left_join_keys: &'a [usize],
445        right_table: &'a mut TemporalSide<K, S>,
446        memo_table_lookup_prefix: &'a [usize],
447        memo_table: &'a mut Option<StateTable<S>>,
448        null_matched: &'a K::Bitmap,
449        chunk: StreamChunk,
450        metrics: &'a TemporalJoinMetrics,
451    ) {
452        let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
453        let keys = K::build_many(left_join_keys, chunk.data_chunk());
454        let to_fetch_keys = chunk
455            .visibility()
456            .iter()
457            .zip_eq_debug(keys.iter())
458            .zip_eq_debug(chunk.ops())
459            .filter_map(|((vis, key), op)| {
460                if vis {
461                    if APPEND_ONLY {
462                        assert_eq!(*op, Op::Insert);
463                        Some(key)
464                    } else {
465                        match op {
466                            Op::Insert | Op::UpdateInsert => Some(key),
467                            Op::Delete | Op::UpdateDelete => None,
468                        }
469                    }
470                } else {
471                    None
472                }
473            });
474        right_table
475            .fetch_or_promote_keys(to_fetch_keys, epoch, metrics)
476            .await?;
477
478        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
479            let Some((op, left_row)) = r else {
480                continue;
481            };
482
483            let mut matched = false;
484
485            if APPEND_ONLY {
486                // Append-only temporal join
487                if key.null_bitmap().is_subset(null_matched)
488                    && let join_entry = right_table.force_peek(&key)
489                    && !join_entry.is_empty()
490                {
491                    matched = true;
492                    for right_row in join_entry.cached.values() {
493                        if let Some(chunk) =
494                            E::append_matched_row(op, &mut builder, left_row, right_row)
495                        {
496                            yield chunk;
497                        }
498                    }
499                }
500            } else {
501                // Non-append-only temporal join
502                // The memo-table pk and columns:
503                // (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`)
504                //
505                // Write pattern:
506                //   for each left input row (with insert op), construct the memo table pk and insert the row into the memo table.
507                // Read pattern:
508                //   for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table.
509                //
510                // Temporal join supports inner join and left outer join, additionally, it could contain other conditions.
511                // Surprisingly, we could handle them in a unified way with memo table.
512                // The memo table would persist rows fetched from the right table and appending the `join_key` and `left_pk` from the left row.
513                // The null rows generated by outer join and the other condition somehow is a stateless operation which means we can handle them without the memo table.
514                let memo_table = memo_table.as_mut().unwrap();
515                match op {
516                    Op::Insert | Op::UpdateInsert => {
517                        if key.null_bitmap().is_subset(null_matched)
518                            && let join_entry = right_table.force_peek(&key)
519                            && !join_entry.is_empty()
520                        {
521                            matched = true;
522                            for right_row in join_entry.cached.values() {
523                                let right_row: OwnedRow = right_row.clone();
524                                // Insert into memo table
525                                memo_table.insert(right_row.clone().chain(
526                                    left_row.project(memo_table_lookup_prefix).into_owned_row(),
527                                ));
528                                if let Some(chunk) = E::append_matched_row(
529                                    Op::Insert,
530                                    &mut builder,
531                                    left_row,
532                                    right_row,
533                                ) {
534                                    yield chunk;
535                                }
536                            }
537                        }
538                    }
539                    Op::Delete | Op::UpdateDelete => {
540                        let mut memo_rows_to_delete = vec![];
541                        if key.null_bitmap().is_subset(null_matched) {
542                            let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
543                                &(Bound::Unbounded, Bound::Unbounded);
544                            let prefix = left_row.project(memo_table_lookup_prefix);
545                            let state_table_iter = memo_table
546                                .iter_with_prefix(prefix, sub_range, Default::default())
547                                .await?;
548                            pin_mut!(state_table_iter);
549
550                            while let Some(memo_row) = state_table_iter.next().await {
551                                matched = true;
552                                let memo_row = memo_row?.into_owned_row();
553                                memo_rows_to_delete.push(memo_row.clone());
554                                if let Some(chunk) = E::append_matched_row(
555                                    Op::Delete,
556                                    &mut builder,
557                                    left_row,
558                                    memo_row.slice(0..right_size),
559                                ) {
560                                    yield chunk;
561                                }
562                            }
563                        }
564                        for memo_row in memo_rows_to_delete {
565                            // Delete from memo table
566                            memo_table.delete(memo_row);
567                        }
568                    }
569                }
570            }
571            if let Some(chunk) = E::match_end(
572                &mut builder,
573                match op {
574                    Op::Insert | Op::UpdateInsert => Op::Insert,
575                    Op::Delete | Op::UpdateDelete => Op::Delete,
576                },
577                left_row,
578                right_size,
579                matched,
580            ) {
581                yield chunk;
582            }
583        }
584
585        if let Some(chunk) = builder.take() {
586            yield chunk;
587        }
588    }
589}
590
591impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool>
592    TemporalJoinExecutor<K, S, T, APPEND_ONLY>
593{
594    #[allow(clippy::too_many_arguments)]
595    pub fn new(
596        ctx: ActorContextRef,
597        info: ExecutorInfo,
598        left: Executor,
599        right: Executor,
600        table: BatchTable<S>,
601        left_join_keys: Vec<usize>,
602        right_join_keys: Vec<usize>,
603        null_safe: Vec<bool>,
604        condition: Option<NonStrictExpression>,
605        output_indices: Vec<usize>,
606        table_output_indices: Vec<usize>,
607        table_stream_key_indices: Vec<usize>,
608        watermark_sequence: AtomicU64Ref,
609        metrics: Arc<StreamingMetrics>,
610        chunk_size: usize,
611        join_key_data_types: Vec<DataType>,
612        memo_table: Option<StateTable<S>>,
613    ) -> Self {
614        let alloc = StatsAlloc::new(Global).shared();
615
616        let metrics_info = MetricsInfo::new(
617            metrics.clone(),
618            table.table_id().table_id,
619            ctx.id,
620            "temporal join",
621        );
622
623        let cache = ManagedLruCache::unbounded_with_hasher_in(
624            watermark_sequence,
625            metrics_info,
626            DefaultHasher::default(),
627            alloc,
628        );
629
630        let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);
631
632        Self {
633            ctx: ctx.clone(),
634            info,
635            left,
636            right,
637            right_table: TemporalSide {
638                source: table,
639                table_stream_key_indices,
640                table_output_indices,
641                cache,
642                join_key_data_types,
643            },
644            left_join_keys,
645            right_join_keys,
646            null_safe,
647            condition,
648            output_indices,
649            chunk_size,
650            memo_table,
651            metrics,
652        }
653    }
654
655    #[try_stream(ok = Message, error = StreamExecutorError)]
656    async fn into_stream(mut self) {
657        let right_size = self.right.schema().len();
658
659        let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
660            &self.output_indices,
661            self.left.schema().len(),
662            right_size,
663        );
664
665        let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
666
667        let left_stream_key_indices = self.left.pk_indices().to_vec();
668        let right_stream_key_indices = self.right.pk_indices().to_vec();
669        let memo_table_lookup_prefix = self
670            .left_join_keys
671            .iter()
672            .cloned()
673            .chain(left_stream_key_indices)
674            .collect_vec();
675
676        let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
677
678        let mut prev_epoch = None;
679
680        let full_schema: Vec<_> = self
681            .left
682            .schema()
683            .data_types()
684            .into_iter()
685            .chain(self.right.schema().data_types().into_iter())
686            .collect();
687
688        let mut wait_first_barrier = true;
689
690        #[for_await]
691        for msg in align_input::<true>(self.left, self.right) {
692            self.right_table.cache.evict();
693            self.metrics
694                .temporal_join_cached_entry_count
695                .set(self.right_table.cache.len() as i64);
696            match msg? {
697                InternalMessage::WaterMark(watermark) => {
698                    let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
699                    yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
700                }
701                InternalMessage::Chunk(chunk) => {
702                    let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
703
704                    let full_schema = full_schema.clone();
705
706                    if T == JoinType::Inner {
707                        let st1 = phase1::handle_chunk::<K, S, phase1::Inner, APPEND_ONLY>(
708                            self.chunk_size,
709                            right_size,
710                            full_schema,
711                            epoch,
712                            &self.left_join_keys,
713                            &mut self.right_table,
714                            &memo_table_lookup_prefix,
715                            &mut self.memo_table,
716                            &null_matched,
717                            chunk,
718                            &self.metrics,
719                        );
720                        #[for_await]
721                        for chunk in st1 {
722                            let chunk = chunk?;
723                            let new_chunk = if let Some(ref cond) = self.condition {
724                                let (data_chunk, ops) = chunk.into_parts();
725                                let passed_bitmap = cond.eval_infallible(&data_chunk).await;
726                                let passed_bitmap =
727                                    Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
728                                let (columns, vis) = data_chunk.into_parts();
729                                let new_vis = vis & passed_bitmap;
730                                StreamChunk::with_visibility(ops, columns, new_vis)
731                            } else {
732                                chunk
733                            };
734                            let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
735                            yield Message::Chunk(new_chunk);
736                        }
737                    } else if let Some(ref cond) = self.condition {
738                        // Joined result without evaluating non-lookup conditions.
739                        let st1 =
740                            phase1::handle_chunk::<K, S, phase1::LeftOuterWithCond, APPEND_ONLY>(
741                                self.chunk_size,
742                                right_size,
743                                full_schema,
744                                epoch,
745                                &self.left_join_keys,
746                                &mut self.right_table,
747                                &memo_table_lookup_prefix,
748                                &mut self.memo_table,
749                                &null_matched,
750                                chunk,
751                                &self.metrics,
752                            );
753                        let mut matched_count = 0usize;
754                        #[for_await]
755                        for chunk in st1 {
756                            let chunk = chunk?;
757                            let (data_chunk, ops) = chunk.into_parts();
758                            let passed_bitmap = cond.eval_infallible(&data_chunk).await;
759                            let passed_bitmap =
760                                Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
761                            let (columns, vis) = data_chunk.into_parts();
762                            let mut new_vis = BitmapBuilder::with_capacity(vis.len());
763                            for (passed, not_match_end) in
764                                passed_bitmap.iter().zip_eq_debug(vis.iter())
765                            {
766                                let is_match_end = !not_match_end;
767                                let vis = if is_match_end && matched_count == 0 {
768                                    // Nothing is matched, so the marker row should be visible.
769                                    true
770                                } else if is_match_end {
771                                    // reset the count
772                                    matched_count = 0;
773                                    // rows found, so the marker row should be invisible.
774                                    false
775                                } else {
776                                    if passed {
777                                        matched_count += 1;
778                                    }
779                                    passed
780                                };
781                                new_vis.append(vis);
782                            }
783                            let new_chunk = apply_indices_map(
784                                StreamChunk::with_visibility(ops, columns, new_vis.finish()),
785                                &self.output_indices,
786                            );
787                            yield Message::Chunk(new_chunk);
788                        }
789                        // The last row should always be marker row,
790                        assert_eq!(matched_count, 0);
791                    } else {
792                        let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuter, APPEND_ONLY>(
793                            self.chunk_size,
794                            right_size,
795                            full_schema,
796                            epoch,
797                            &self.left_join_keys,
798                            &mut self.right_table,
799                            &memo_table_lookup_prefix,
800                            &mut self.memo_table,
801                            &null_matched,
802                            chunk,
803                            &self.metrics,
804                        );
805                        #[for_await]
806                        for chunk in st1 {
807                            let chunk = chunk?;
808                            let new_chunk = apply_indices_map(chunk, &self.output_indices);
809                            yield Message::Chunk(new_chunk);
810                        }
811                    }
812                }
813                InternalMessage::Barrier(updates, barrier) => {
814                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
815                    let barrier_epoch = barrier.epoch;
816                    if !APPEND_ONLY {
817                        if wait_first_barrier {
818                            wait_first_barrier = false;
819                            yield Message::Barrier(barrier);
820                            self.memo_table
821                                .as_mut()
822                                .unwrap()
823                                .init_epoch(barrier_epoch)
824                                .await?;
825                        } else {
826                            let post_commit = self
827                                .memo_table
828                                .as_mut()
829                                .unwrap()
830                                .commit(barrier.epoch)
831                                .await?;
832                            yield Message::Barrier(barrier);
833                            post_commit
834                                .post_yield_barrier(update_vnode_bitmap.clone())
835                                .await?;
836                        }
837                    } else {
838                        yield Message::Barrier(barrier);
839                    }
840                    if let Some(vnodes) = update_vnode_bitmap {
841                        let prev_vnodes =
842                            self.right_table.source.update_vnode_bitmap(vnodes.clone());
843                        if cache_may_stale(&prev_vnodes, &vnodes) {
844                            self.right_table.cache.clear();
845                        }
846                    }
847                    self.right_table.update(
848                        updates,
849                        &self.right_join_keys,
850                        &right_stream_key_indices,
851                    )?;
852                    prev_epoch = Some(barrier_epoch.curr);
853                }
854            }
855        }
856    }
857}
858
859impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool> Execute
860    for TemporalJoinExecutor<K, S, T, APPEND_ONLY>
861{
862    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
863        self.into_stream().boxed()
864    }
865}