risingwave_stream/executor/
temporal_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17
18use either::Either;
19use futures::TryStreamExt;
20use futures::stream::{self, PollNext};
21use itertools::Itertools;
22use risingwave_common::array::Op;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::hash::{HashKey, NullBitmap};
25use risingwave_common::row::RowExt;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_common_estimate_size::{EstimateSize, KvSize};
28use risingwave_expr::expr::NonStrictExpression;
29use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
30use risingwave_storage::store::PrefetchOptions;
31use risingwave_storage::table::TableIter;
32use risingwave_storage::table::batch_table::BatchTable;
33
34use super::join::{JoinType, JoinTypePrimitive};
35use super::monitor::TemporalJoinMetrics;
36use crate::cache::{ManagedLruCache, cache_may_stale};
37use crate::common::metrics::MetricsInfo;
38use crate::executor::join::builder::JoinStreamChunkBuilder;
39use crate::executor::prelude::*;
40
41pub struct TemporalJoinExecutor<
42    K: HashKey,
43    S: StateStore,
44    const T: JoinTypePrimitive,
45    const APPEND_ONLY: bool,
46> {
47    ctx: ActorContextRef,
48    #[allow(dead_code)]
49    info: ExecutorInfo,
50    left: Executor,
51    right: Executor,
52    right_table: TemporalSide<K, S>,
53    left_join_keys: Vec<usize>,
54    right_join_keys: Vec<usize>,
55    null_safe: Vec<bool>,
56    condition: Option<NonStrictExpression>,
57    output_indices: Vec<usize>,
58    chunk_size: usize,
59    memo_table: Option<StateTable<S>>,
60    metrics: TemporalJoinMetrics,
61}
62
63#[derive(Default)]
64pub struct JoinEntry {
65    /// pk -> row
66    cached: HashMap<OwnedRow, OwnedRow>,
67    kv_heap_size: KvSize,
68}
69
70impl EstimateSize for JoinEntry {
71    fn estimated_heap_size(&self) -> usize {
72        // TODO: Add internal size.
73        // https://github.com/risingwavelabs/risingwave/issues/9713
74        self.kv_heap_size.size()
75    }
76}
77
78impl JoinEntry {
79    /// Insert into the cache.
80    pub fn insert(&mut self, key: OwnedRow, value: OwnedRow) {
81        // Lookup might refill the cache before the `insert` messages from the temporal side
82        // upstream.
83        if let Entry::Vacant(e) = self.cached.entry(key) {
84            self.kv_heap_size.add(e.key(), &value);
85            e.insert(value);
86        } else {
87            panic!("value {:?} double insert", value);
88        }
89    }
90
91    /// Delete from the cache.
92    pub fn remove(&mut self, key: &OwnedRow) {
93        if let Some(value) = self.cached.remove(key) {
94            self.kv_heap_size.sub(key, &value);
95        } else {
96            panic!("key {:?} should be in the cache", key);
97        }
98    }
99
100    pub fn is_empty(&self) -> bool {
101        self.cached.is_empty()
102    }
103}
104
105struct TemporalSide<K: HashKey, S: StateStore> {
106    source: BatchTable<S>,
107    table_stream_key_indices: Vec<usize>,
108    table_output_indices: Vec<usize>,
109    cache: ManagedLruCache<K, JoinEntry>,
110    join_key_data_types: Vec<DataType>,
111}
112
113impl<K: HashKey, S: StateStore> TemporalSide<K, S> {
114    /// Fetch records from temporal side table and ensure the entry in the cache.
115    /// If already exists, the entry will be promoted.
116    async fn fetch_or_promote_keys(
117        &mut self,
118        keys: impl Iterator<Item = &K>,
119        epoch: HummockEpoch,
120        metrics: &TemporalJoinMetrics,
121    ) -> StreamExecutorResult<()> {
122        let mut futs = Vec::with_capacity(keys.size_hint().1.unwrap_or(0));
123        for key in keys {
124            metrics.temporal_join_total_query_cache_count.inc();
125
126            if self.cache.get(key).is_none() {
127                metrics.temporal_join_cache_miss_count.inc();
128
129                futs.push(async {
130                    let pk_prefix = key.deserialize(&self.join_key_data_types)?;
131
132                    let iter = self
133                        .source
134                        .batch_iter_with_pk_bounds(
135                            HummockReadEpoch::NoWait(epoch),
136                            &pk_prefix,
137                            ..,
138                            false,
139                            PrefetchOptions::default(),
140                        )
141                        .await?;
142
143                    let mut entry = JoinEntry::default();
144
145                    pin_mut!(iter);
146                    while let Some(row) = iter.next_row().await? {
147                        entry.insert(
148                            row.as_ref()
149                                .project(&self.table_stream_key_indices)
150                                .into_owned_row(),
151                            row.project(&self.table_output_indices).into_owned_row(),
152                        );
153                    }
154                    let key = key.clone();
155                    Ok((key, entry)) as StreamExecutorResult<_>
156                });
157            }
158        }
159
160        #[for_await]
161        for res in stream::iter(futs).buffered(16) {
162            let (key, entry) = res?;
163            self.cache.put(key, entry);
164        }
165
166        Ok(())
167    }
168
169    fn force_peek(&self, key: &K) -> &JoinEntry {
170        self.cache.peek(key).expect("key should exists")
171    }
172
173    fn update(
174        &mut self,
175        chunks: Vec<StreamChunk>,
176        join_keys: &[usize],
177        right_stream_key_indices: &[usize],
178    ) -> StreamExecutorResult<()> {
179        for chunk in chunks {
180            let keys = K::build_many(join_keys, chunk.data_chunk());
181            for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
182                let Some((op, row)) = r else {
183                    continue;
184                };
185                if self.cache.contains(&key) {
186                    // Update cache
187                    let mut entry = self.cache.get_mut(&key).unwrap();
188                    let stream_key = row.project(right_stream_key_indices).into_owned_row();
189                    match op {
190                        Op::Insert | Op::UpdateInsert => {
191                            entry.insert(stream_key, row.into_owned_row())
192                        }
193                        Op::Delete | Op::UpdateDelete => entry.remove(&stream_key),
194                    };
195                }
196            }
197        }
198        Ok(())
199    }
200}
201
202pub(super) enum InternalMessage {
203    Chunk(StreamChunk),
204    Barrier(Vec<StreamChunk>, Barrier),
205    WaterMark(Watermark),
206}
207
208#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
209async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
210    #[for_await]
211    for item in stream {
212        match item? {
213            Message::Watermark(_) => {
214                // ignore
215            }
216            Message::Chunk(c) => yield c,
217            Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
218                return Err(StreamExecutorError::align_barrier(expected_barrier, b));
219            }
220            Message::Barrier(_) => return Ok(()),
221        }
222    }
223}
224
225#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
226async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
227    #[for_await]
228    for item in stream {
229        match item? {
230            Message::Watermark(w) => {
231                yield InternalMessage::WaterMark(w);
232            }
233            Message::Chunk(c) => yield InternalMessage::Chunk(c),
234            Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
235                return Err(StreamExecutorError::align_barrier(expected_barrier, b));
236            }
237            Message::Barrier(_) => return Ok(()),
238        }
239    }
240}
241
242// Align the left and right inputs according to their barriers,
243// such that in the produced stream, an aligned interval starts with
244// any number of `InternalMessage::Chunk(left_chunk)` and followed by
245// `InternalMessage::Barrier(right_chunks, barrier)`.
246#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
247pub(super) async fn align_input<const YIELD_RIGHT_CHUNKS: bool>(left: Executor, right: Executor) {
248    let mut left = pin!(left.execute());
249    let mut right = pin!(right.execute());
250    // Keep producing intervals until stream exhaustion or errors.
251    loop {
252        let mut right_chunks = vec![];
253        // Produce an aligned interval.
254        'inner: loop {
255            let mut combined = stream::select_with_strategy(
256                left.by_ref().map(Either::Left),
257                right.by_ref().map(Either::Right),
258                |_: &mut ()| PollNext::Left,
259            );
260            match combined.next().await {
261                Some(Either::Left(Ok(Message::Chunk(c)))) => yield InternalMessage::Chunk(c),
262                Some(Either::Right(Ok(Message::Chunk(c)))) => {
263                    if YIELD_RIGHT_CHUNKS {
264                        right_chunks.push(c);
265                    }
266                }
267                Some(Either::Left(Ok(Message::Barrier(b)))) => {
268                    let mut remain = chunks_until_barrier(right.by_ref(), b.clone())
269                        .try_collect()
270                        .await?;
271                    if YIELD_RIGHT_CHUNKS {
272                        right_chunks.append(&mut remain);
273                    }
274                    yield InternalMessage::Barrier(right_chunks, b);
275                    break 'inner;
276                }
277                Some(Either::Right(Ok(Message::Barrier(b)))) => {
278                    #[for_await]
279                    for internal_message in
280                        internal_messages_until_barrier(left.by_ref(), b.clone())
281                    {
282                        yield internal_message?;
283                    }
284                    yield InternalMessage::Barrier(right_chunks, b);
285                    break 'inner;
286                }
287                Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e),
288                Some(Either::Left(Ok(Message::Watermark(w)))) => {
289                    yield InternalMessage::WaterMark(w);
290                }
291                Some(Either::Right(Ok(Message::Watermark(_)))) => {
292                    // ignore right side watermark
293                }
294                None => return Ok(()),
295            }
296        }
297    }
298}
299
300pub(super) fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk {
301    let (data_chunk, ops) = chunk.into_parts();
302    let (columns, vis) = data_chunk.into_parts();
303    let output_columns = indices
304        .iter()
305        .cloned()
306        .map(|idx| columns[idx].clone())
307        .collect();
308    StreamChunk::with_visibility(ops, output_columns, vis)
309}
310
311pub(super) mod phase1 {
312    use std::ops::Bound;
313
314    use futures::{StreamExt, pin_mut};
315    use futures_async_stream::try_stream;
316    use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
317    use risingwave_common::array::{Op, StreamChunk};
318    use risingwave_common::hash::{HashKey, NullBitmap};
319    use risingwave_common::row::{self, OwnedRow, Row, RowExt};
320    use risingwave_common::types::{DataType, DatumRef};
321    use risingwave_common::util::iter_util::ZipEqDebug;
322    use risingwave_hummock_sdk::HummockEpoch;
323    use risingwave_storage::StateStore;
324
325    use super::{StreamExecutorError, TemporalSide};
326    use crate::common::table::state_table::StateTable;
327    use crate::executor::monitor::TemporalJoinMetrics;
328
329    pub trait Phase1Evaluation {
330        /// Called when a matched row is found.
331        #[must_use = "consume chunk if produced"]
332        fn append_matched_row(
333            op: Op,
334            builder: &mut StreamChunkBuilder,
335            left_row: impl Row,
336            right_row: impl Row,
337        ) -> Option<StreamChunk>;
338
339        /// Called when all matched rows of a join key are appended.
340        #[must_use = "consume chunk if produced"]
341        fn match_end(
342            builder: &mut StreamChunkBuilder,
343            op: Op,
344            left_row: impl Row,
345            right_size: usize,
346            matched: bool,
347        ) -> Option<StreamChunk>;
348    }
349
350    pub struct Inner;
351    pub struct LeftOuter;
352    pub struct LeftOuterWithCond;
353
354    impl Phase1Evaluation for Inner {
355        fn append_matched_row(
356            op: Op,
357            builder: &mut StreamChunkBuilder,
358            left_row: impl Row,
359            right_row: impl Row,
360        ) -> Option<StreamChunk> {
361            builder.append_row(op, left_row.chain(right_row))
362        }
363
364        fn match_end(
365            _builder: &mut StreamChunkBuilder,
366            _op: Op,
367            _left_row: impl Row,
368            _right_size: usize,
369            _matched: bool,
370        ) -> Option<StreamChunk> {
371            None
372        }
373    }
374
375    impl Phase1Evaluation for LeftOuter {
376        fn append_matched_row(
377            op: Op,
378            builder: &mut StreamChunkBuilder,
379            left_row: impl Row,
380            right_row: impl Row,
381        ) -> Option<StreamChunk> {
382            builder.append_row(op, left_row.chain(right_row))
383        }
384
385        fn match_end(
386            builder: &mut StreamChunkBuilder,
387            op: Op,
388            left_row: impl Row,
389            right_size: usize,
390            matched: bool,
391        ) -> Option<StreamChunk> {
392            if !matched {
393                // If no rows matched, a marker row should be inserted.
394                builder.append_row(
395                    op,
396                    left_row.chain(row::repeat_n(DatumRef::None, right_size)),
397                )
398            } else {
399                None
400            }
401        }
402    }
403
404    impl Phase1Evaluation for LeftOuterWithCond {
405        fn append_matched_row(
406            op: Op,
407            builder: &mut StreamChunkBuilder,
408            left_row: impl Row,
409            right_row: impl Row,
410        ) -> Option<StreamChunk> {
411            builder.append_row(op, left_row.chain(right_row))
412        }
413
414        fn match_end(
415            builder: &mut StreamChunkBuilder,
416            op: Op,
417            left_row: impl Row,
418            right_size: usize,
419            _matched: bool,
420        ) -> Option<StreamChunk> {
421            // A marker row should always be inserted and mark as invisible for non-lookup filters evaluation.
422            // The row will be converted to visible in the further steps if no rows matched after all filters evaluated.
423            builder.append_row_invisible(
424                op,
425                left_row.chain(row::repeat_n(DatumRef::None, right_size)),
426            )
427        }
428    }
429
430    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
431    #[allow(clippy::too_many_arguments)]
432    pub(super) async fn handle_chunk<
433        'a,
434        K: HashKey,
435        S: StateStore,
436        E: Phase1Evaluation,
437        const APPEND_ONLY: bool,
438    >(
439        chunk_size: usize,
440        right_size: usize,
441        full_schema: Vec<DataType>,
442        epoch: HummockEpoch,
443        left_join_keys: &'a [usize],
444        right_table: &'a mut TemporalSide<K, S>,
445        memo_table_lookup_prefix: &'a [usize],
446        memo_table: &'a mut Option<StateTable<S>>,
447        null_matched: &'a K::Bitmap,
448        chunk: StreamChunk,
449        metrics: &'a TemporalJoinMetrics,
450    ) {
451        let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
452        let keys = K::build_many(left_join_keys, chunk.data_chunk());
453        let to_fetch_keys = chunk
454            .visibility()
455            .iter()
456            .zip_eq_debug(keys.iter())
457            .zip_eq_debug(chunk.ops())
458            .filter_map(|((vis, key), op)| {
459                if vis {
460                    if APPEND_ONLY {
461                        assert_eq!(*op, Op::Insert);
462                        Some(key)
463                    } else {
464                        match op {
465                            Op::Insert | Op::UpdateInsert => Some(key),
466                            Op::Delete | Op::UpdateDelete => None,
467                        }
468                    }
469                } else {
470                    None
471                }
472            });
473        right_table
474            .fetch_or_promote_keys(to_fetch_keys, epoch, metrics)
475            .await?;
476
477        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
478            let Some((op, left_row)) = r else {
479                continue;
480            };
481
482            let mut matched = false;
483
484            if APPEND_ONLY {
485                // Append-only temporal join
486                if key.null_bitmap().is_subset(null_matched)
487                    && let join_entry = right_table.force_peek(&key)
488                    && !join_entry.is_empty()
489                {
490                    matched = true;
491                    for right_row in join_entry.cached.values() {
492                        if let Some(chunk) =
493                            E::append_matched_row(op, &mut builder, left_row, right_row)
494                        {
495                            yield chunk;
496                        }
497                    }
498                }
499            } else {
500                // Non-append-only temporal join
501                // The memo-table pk and columns:
502                // (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`)
503                //
504                // Write pattern:
505                //   for each left input row (with insert op), construct the memo table pk and insert the row into the memo table.
506                // Read pattern:
507                //   for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table.
508                //
509                // Temporal join supports inner join and left outer join, additionally, it could contain other conditions.
510                // Surprisingly, we could handle them in a unified way with memo table.
511                // The memo table would persist rows fetched from the right table and appending the `join_key` and `left_pk` from the left row.
512                // The null rows generated by outer join and the other condition somehow is a stateless operation which means we can handle them without the memo table.
513                let memo_table = memo_table.as_mut().unwrap();
514                match op {
515                    Op::Insert | Op::UpdateInsert => {
516                        if key.null_bitmap().is_subset(null_matched)
517                            && let join_entry = right_table.force_peek(&key)
518                            && !join_entry.is_empty()
519                        {
520                            matched = true;
521                            for right_row in join_entry.cached.values() {
522                                let right_row: OwnedRow = right_row.clone();
523                                // Insert into memo table
524                                memo_table.insert(right_row.clone().chain(
525                                    left_row.project(memo_table_lookup_prefix).into_owned_row(),
526                                ));
527                                if let Some(chunk) = E::append_matched_row(
528                                    Op::Insert,
529                                    &mut builder,
530                                    left_row,
531                                    right_row,
532                                ) {
533                                    yield chunk;
534                                }
535                            }
536                        }
537                    }
538                    Op::Delete | Op::UpdateDelete => {
539                        let mut memo_rows_to_delete = vec![];
540                        if key.null_bitmap().is_subset(null_matched) {
541                            let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
542                                &(Bound::Unbounded, Bound::Unbounded);
543                            let prefix = left_row.project(memo_table_lookup_prefix);
544                            let state_table_iter = memo_table
545                                .iter_with_prefix(prefix, sub_range, Default::default())
546                                .await?;
547                            pin_mut!(state_table_iter);
548
549                            while let Some(memo_row) = state_table_iter.next().await {
550                                matched = true;
551                                let memo_row = memo_row?.into_owned_row();
552                                memo_rows_to_delete.push(memo_row.clone());
553                                if let Some(chunk) = E::append_matched_row(
554                                    Op::Delete,
555                                    &mut builder,
556                                    left_row,
557                                    memo_row.slice(0..right_size),
558                                ) {
559                                    yield chunk;
560                                }
561                            }
562                        }
563                        for memo_row in memo_rows_to_delete {
564                            // Delete from memo table
565                            memo_table.delete(memo_row);
566                        }
567                    }
568                }
569            }
570            if let Some(chunk) = E::match_end(
571                &mut builder,
572                match op {
573                    Op::Insert | Op::UpdateInsert => Op::Insert,
574                    Op::Delete | Op::UpdateDelete => Op::Delete,
575                },
576                left_row,
577                right_size,
578                matched,
579            ) {
580                yield chunk;
581            }
582        }
583
584        if let Some(chunk) = builder.take() {
585            yield chunk;
586        }
587    }
588}
589
590impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool>
591    TemporalJoinExecutor<K, S, T, APPEND_ONLY>
592{
593    #[allow(clippy::too_many_arguments)]
594    pub fn new(
595        ctx: ActorContextRef,
596        info: ExecutorInfo,
597        left: Executor,
598        right: Executor,
599        table: BatchTable<S>,
600        left_join_keys: Vec<usize>,
601        right_join_keys: Vec<usize>,
602        null_safe: Vec<bool>,
603        condition: Option<NonStrictExpression>,
604        output_indices: Vec<usize>,
605        table_output_indices: Vec<usize>,
606        table_stream_key_indices: Vec<usize>,
607        watermark_sequence: AtomicU64Ref,
608        metrics: Arc<StreamingMetrics>,
609        chunk_size: usize,
610        join_key_data_types: Vec<DataType>,
611        memo_table: Option<StateTable<S>>,
612    ) -> Self {
613        let metrics_info = MetricsInfo::new(
614            metrics.clone(),
615            table.table_id().table_id,
616            ctx.id,
617            "temporal join",
618        );
619
620        let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info);
621
622        let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);
623
624        Self {
625            ctx,
626            info,
627            left,
628            right,
629            right_table: TemporalSide {
630                source: table,
631                table_stream_key_indices,
632                table_output_indices,
633                cache,
634                join_key_data_types,
635            },
636            left_join_keys,
637            right_join_keys,
638            null_safe,
639            condition,
640            output_indices,
641            chunk_size,
642            memo_table,
643            metrics,
644        }
645    }
646
647    #[try_stream(ok = Message, error = StreamExecutorError)]
648    async fn into_stream(mut self) {
649        let right_size = self.right.schema().len();
650
651        let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
652            &self.output_indices,
653            self.left.schema().len(),
654            right_size,
655        );
656
657        let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
658
659        let left_stream_key_indices = self.left.pk_indices().to_vec();
660        let right_stream_key_indices = self.right.pk_indices().to_vec();
661        let memo_table_lookup_prefix = self
662            .left_join_keys
663            .iter()
664            .cloned()
665            .chain(left_stream_key_indices)
666            .collect_vec();
667
668        let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
669
670        let mut prev_epoch = None;
671
672        let full_schema: Vec<_> = self
673            .left
674            .schema()
675            .data_types()
676            .into_iter()
677            .chain(self.right.schema().data_types().into_iter())
678            .collect();
679
680        let mut wait_first_barrier = true;
681
682        #[for_await]
683        for msg in align_input::<true>(self.left, self.right) {
684            self.right_table.cache.evict();
685            self.metrics
686                .temporal_join_cached_entry_count
687                .set(self.right_table.cache.len() as i64);
688            match msg? {
689                InternalMessage::WaterMark(watermark) => {
690                    let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
691                    yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
692                }
693                InternalMessage::Chunk(chunk) => {
694                    let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
695
696                    let full_schema = full_schema.clone();
697
698                    if T == JoinType::Inner {
699                        let st1 = phase1::handle_chunk::<K, S, phase1::Inner, APPEND_ONLY>(
700                            self.chunk_size,
701                            right_size,
702                            full_schema,
703                            epoch,
704                            &self.left_join_keys,
705                            &mut self.right_table,
706                            &memo_table_lookup_prefix,
707                            &mut self.memo_table,
708                            &null_matched,
709                            chunk,
710                            &self.metrics,
711                        );
712                        #[for_await]
713                        for chunk in st1 {
714                            let chunk = chunk?;
715                            let new_chunk = if let Some(ref cond) = self.condition {
716                                let (data_chunk, ops) = chunk.into_parts();
717                                let passed_bitmap = cond.eval_infallible(&data_chunk).await;
718                                let passed_bitmap =
719                                    Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
720                                let (columns, vis) = data_chunk.into_parts();
721                                let new_vis = vis & passed_bitmap;
722                                StreamChunk::with_visibility(ops, columns, new_vis)
723                            } else {
724                                chunk
725                            };
726                            let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
727                            yield Message::Chunk(new_chunk);
728                        }
729                    } else if let Some(ref cond) = self.condition {
730                        // Joined result without evaluating non-lookup conditions.
731                        let st1 =
732                            phase1::handle_chunk::<K, S, phase1::LeftOuterWithCond, APPEND_ONLY>(
733                                self.chunk_size,
734                                right_size,
735                                full_schema,
736                                epoch,
737                                &self.left_join_keys,
738                                &mut self.right_table,
739                                &memo_table_lookup_prefix,
740                                &mut self.memo_table,
741                                &null_matched,
742                                chunk,
743                                &self.metrics,
744                            );
745                        let mut matched_count = 0usize;
746                        #[for_await]
747                        for chunk in st1 {
748                            let chunk = chunk?;
749                            let (data_chunk, ops) = chunk.into_parts();
750                            let passed_bitmap = cond.eval_infallible(&data_chunk).await;
751                            let passed_bitmap =
752                                Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
753                            let (columns, vis) = data_chunk.into_parts();
754                            let mut new_vis = BitmapBuilder::with_capacity(vis.len());
755                            for (passed, not_match_end) in
756                                passed_bitmap.iter().zip_eq_debug(vis.iter())
757                            {
758                                let is_match_end = !not_match_end;
759                                let vis = if is_match_end && matched_count == 0 {
760                                    // Nothing is matched, so the marker row should be visible.
761                                    true
762                                } else if is_match_end {
763                                    // reset the count
764                                    matched_count = 0;
765                                    // rows found, so the marker row should be invisible.
766                                    false
767                                } else {
768                                    if passed {
769                                        matched_count += 1;
770                                    }
771                                    passed
772                                };
773                                new_vis.append(vis);
774                            }
775                            let new_chunk = apply_indices_map(
776                                StreamChunk::with_visibility(ops, columns, new_vis.finish()),
777                                &self.output_indices,
778                            );
779                            yield Message::Chunk(new_chunk);
780                        }
781                        // The last row should always be marker row,
782                        assert_eq!(matched_count, 0);
783                    } else {
784                        let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuter, APPEND_ONLY>(
785                            self.chunk_size,
786                            right_size,
787                            full_schema,
788                            epoch,
789                            &self.left_join_keys,
790                            &mut self.right_table,
791                            &memo_table_lookup_prefix,
792                            &mut self.memo_table,
793                            &null_matched,
794                            chunk,
795                            &self.metrics,
796                        );
797                        #[for_await]
798                        for chunk in st1 {
799                            let chunk = chunk?;
800                            let new_chunk = apply_indices_map(chunk, &self.output_indices);
801                            yield Message::Chunk(new_chunk);
802                        }
803                    }
804                }
805                InternalMessage::Barrier(updates, barrier) => {
806                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
807                    let barrier_epoch = barrier.epoch;
808                    if !APPEND_ONLY {
809                        if wait_first_barrier {
810                            wait_first_barrier = false;
811                            yield Message::Barrier(barrier);
812                            self.memo_table
813                                .as_mut()
814                                .unwrap()
815                                .init_epoch(barrier_epoch)
816                                .await?;
817                        } else {
818                            let post_commit = self
819                                .memo_table
820                                .as_mut()
821                                .unwrap()
822                                .commit(barrier.epoch)
823                                .await?;
824                            yield Message::Barrier(barrier);
825                            post_commit
826                                .post_yield_barrier(update_vnode_bitmap.clone())
827                                .await?;
828                        }
829                    } else {
830                        yield Message::Barrier(barrier);
831                    }
832                    if let Some(vnodes) = update_vnode_bitmap {
833                        let prev_vnodes =
834                            self.right_table.source.update_vnode_bitmap(vnodes.clone());
835                        if cache_may_stale(&prev_vnodes, &vnodes) {
836                            self.right_table.cache.clear();
837                        }
838                    }
839                    self.right_table.update(
840                        updates,
841                        &self.right_join_keys,
842                        &right_stream_key_indices,
843                    )?;
844                    prev_epoch = Some(barrier_epoch.prev);
845                }
846            }
847        }
848    }
849}
850
851impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool> Execute
852    for TemporalJoinExecutor<K, S, T, APPEND_ONLY>
853{
854    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
855        self.into_stream().boxed()
856    }
857}