risingwave_stream/executor/
temporal_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::alloc::Global;
16use std::collections::HashMap;
17use std::collections::hash_map::Entry;
18
19use either::Either;
20use futures::TryStreamExt;
21use futures::stream::{self, PollNext};
22use itertools::Itertools;
23use local_stats_alloc::{SharedStatsAlloc, StatsAlloc};
24use lru::DefaultHasher;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::BitmapBuilder;
27use risingwave_common::hash::{HashKey, NullBitmap};
28use risingwave_common::row::RowExt;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_common_estimate_size::{EstimateSize, KvSize};
31use risingwave_expr::expr::NonStrictExpression;
32use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
33use risingwave_storage::store::PrefetchOptions;
34use risingwave_storage::table::TableIter;
35use risingwave_storage::table::batch_table::BatchTable;
36
37use super::join::{JoinType, JoinTypePrimitive};
38use super::monitor::TemporalJoinMetrics;
39use crate::cache::{ManagedLruCache, cache_may_stale};
40use crate::common::metrics::MetricsInfo;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::prelude::*;
43
44pub struct TemporalJoinExecutor<
45    K: HashKey,
46    S: StateStore,
47    const T: JoinTypePrimitive,
48    const APPEND_ONLY: bool,
49> {
50    ctx: ActorContextRef,
51    #[allow(dead_code)]
52    info: ExecutorInfo,
53    left: Executor,
54    right: Executor,
55    right_table: TemporalSide<K, S>,
56    left_join_keys: Vec<usize>,
57    right_join_keys: Vec<usize>,
58    null_safe: Vec<bool>,
59    condition: Option<NonStrictExpression>,
60    output_indices: Vec<usize>,
61    chunk_size: usize,
62    memo_table: Option<StateTable<S>>,
63    metrics: TemporalJoinMetrics,
64}
65
66#[derive(Default)]
67pub struct JoinEntry {
68    /// pk -> row
69    cached: HashMap<OwnedRow, OwnedRow>,
70    kv_heap_size: KvSize,
71}
72
73impl EstimateSize for JoinEntry {
74    fn estimated_heap_size(&self) -> usize {
75        // TODO: Add internal size.
76        // https://github.com/risingwavelabs/risingwave/issues/9713
77        self.kv_heap_size.size()
78    }
79}
80
81impl JoinEntry {
82    /// Insert into the cache.
83    pub fn insert(&mut self, key: OwnedRow, value: OwnedRow) {
84        // Lookup might refill the cache before the `insert` messages from the temporal side
85        // upstream.
86        if let Entry::Vacant(e) = self.cached.entry(key) {
87            self.kv_heap_size.add(e.key(), &value);
88            e.insert(value);
89        } else {
90            panic!("value {:?} double insert", value);
91        }
92    }
93
94    /// Delete from the cache.
95    pub fn remove(&mut self, key: &OwnedRow) {
96        if let Some(value) = self.cached.remove(key) {
97            self.kv_heap_size.sub(key, &value);
98        } else {
99            panic!("key {:?} should be in the cache", key);
100        }
101    }
102
103    pub fn is_empty(&self) -> bool {
104        self.cached.is_empty()
105    }
106}
107
108struct TemporalSide<K: HashKey, S: StateStore> {
109    source: BatchTable<S>,
110    table_stream_key_indices: Vec<usize>,
111    table_output_indices: Vec<usize>,
112    cache: ManagedLruCache<K, JoinEntry, DefaultHasher, SharedStatsAlloc<Global>>,
113    join_key_data_types: Vec<DataType>,
114}
115
116impl<K: HashKey, S: StateStore> TemporalSide<K, S> {
117    /// Fetch records from temporal side table and ensure the entry in the cache.
118    /// If already exists, the entry will be promoted.
119    async fn fetch_or_promote_keys(
120        &mut self,
121        keys: impl Iterator<Item = &K>,
122        epoch: HummockEpoch,
123        metrics: &TemporalJoinMetrics,
124    ) -> StreamExecutorResult<()> {
125        let mut futs = Vec::with_capacity(keys.size_hint().1.unwrap_or(0));
126        for key in keys {
127            metrics.temporal_join_total_query_cache_count.inc();
128
129            if self.cache.get(key).is_none() {
130                metrics.temporal_join_cache_miss_count.inc();
131
132                futs.push(async {
133                    let pk_prefix = key.deserialize(&self.join_key_data_types)?;
134
135                    let iter = self
136                        .source
137                        .batch_iter_with_pk_bounds(
138                            HummockReadEpoch::NoWait(epoch),
139                            &pk_prefix,
140                            ..,
141                            false,
142                            PrefetchOptions::default(),
143                        )
144                        .await?;
145
146                    let mut entry = JoinEntry::default();
147
148                    pin_mut!(iter);
149                    while let Some(row) = iter.next_row().await? {
150                        entry.insert(
151                            row.as_ref()
152                                .project(&self.table_stream_key_indices)
153                                .into_owned_row(),
154                            row.project(&self.table_output_indices).into_owned_row(),
155                        );
156                    }
157                    let key = key.clone();
158                    Ok((key, entry)) as StreamExecutorResult<_>
159                });
160            }
161        }
162
163        #[for_await]
164        for res in stream::iter(futs).buffered(16) {
165            let (key, entry) = res?;
166            self.cache.put(key, entry);
167        }
168
169        Ok(())
170    }
171
172    fn force_peek(&self, key: &K) -> &JoinEntry {
173        self.cache.peek(key).expect("key should exists")
174    }
175
176    fn update(
177        &mut self,
178        chunks: Vec<StreamChunk>,
179        join_keys: &[usize],
180        right_stream_key_indices: &[usize],
181    ) -> StreamExecutorResult<()> {
182        for chunk in chunks {
183            let keys = K::build_many(join_keys, chunk.data_chunk());
184            for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
185                let Some((op, row)) = r else {
186                    continue;
187                };
188                if self.cache.contains(&key) {
189                    // Update cache
190                    let mut entry = self.cache.get_mut(&key).unwrap();
191                    let stream_key = row.project(right_stream_key_indices).into_owned_row();
192                    match op {
193                        Op::Insert | Op::UpdateInsert => {
194                            entry.insert(stream_key, row.into_owned_row())
195                        }
196                        Op::Delete | Op::UpdateDelete => entry.remove(&stream_key),
197                    };
198                }
199            }
200        }
201        Ok(())
202    }
203}
204
205pub(super) enum InternalMessage {
206    Chunk(StreamChunk),
207    Barrier(Vec<StreamChunk>, Barrier),
208    WaterMark(Watermark),
209}
210
211#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
212async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
213    #[for_await]
214    for item in stream {
215        match item? {
216            Message::Watermark(_) => {
217                // ignore
218            }
219            Message::Chunk(c) => yield c,
220            Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
221                return Err(StreamExecutorError::align_barrier(expected_barrier, b));
222            }
223            Message::Barrier(_) => return Ok(()),
224        }
225    }
226}
227
228#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
229async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
230    #[for_await]
231    for item in stream {
232        match item? {
233            Message::Watermark(w) => {
234                yield InternalMessage::WaterMark(w);
235            }
236            Message::Chunk(c) => yield InternalMessage::Chunk(c),
237            Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
238                return Err(StreamExecutorError::align_barrier(expected_barrier, b));
239            }
240            Message::Barrier(_) => return Ok(()),
241        }
242    }
243}
244
245// Align the left and right inputs according to their barriers,
246// such that in the produced stream, an aligned interval starts with
247// any number of `InternalMessage::Chunk(left_chunk)` and followed by
248// `InternalMessage::Barrier(right_chunks, barrier)`.
249#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
250pub(super) async fn align_input<const YIELD_RIGHT_CHUNKS: bool>(left: Executor, right: Executor) {
251    let mut left = pin!(left.execute());
252    let mut right = pin!(right.execute());
253    // Keep producing intervals until stream exhaustion or errors.
254    loop {
255        let mut right_chunks = vec![];
256        // Produce an aligned interval.
257        'inner: loop {
258            let mut combined = stream::select_with_strategy(
259                left.by_ref().map(Either::Left),
260                right.by_ref().map(Either::Right),
261                |_: &mut ()| PollNext::Left,
262            );
263            match combined.next().await {
264                Some(Either::Left(Ok(Message::Chunk(c)))) => yield InternalMessage::Chunk(c),
265                Some(Either::Right(Ok(Message::Chunk(c)))) => {
266                    if YIELD_RIGHT_CHUNKS {
267                        right_chunks.push(c);
268                    }
269                }
270                Some(Either::Left(Ok(Message::Barrier(b)))) => {
271                    let mut remain = chunks_until_barrier(right.by_ref(), b.clone())
272                        .try_collect()
273                        .await?;
274                    if YIELD_RIGHT_CHUNKS {
275                        right_chunks.append(&mut remain);
276                    }
277                    yield InternalMessage::Barrier(right_chunks, b);
278                    break 'inner;
279                }
280                Some(Either::Right(Ok(Message::Barrier(b)))) => {
281                    #[for_await]
282                    for internal_message in
283                        internal_messages_until_barrier(left.by_ref(), b.clone())
284                    {
285                        yield internal_message?;
286                    }
287                    yield InternalMessage::Barrier(right_chunks, b);
288                    break 'inner;
289                }
290                Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e),
291                Some(Either::Left(Ok(Message::Watermark(w)))) => {
292                    yield InternalMessage::WaterMark(w);
293                }
294                Some(Either::Right(Ok(Message::Watermark(_)))) => {
295                    // ignore right side watermark
296                }
297                None => return Ok(()),
298            }
299        }
300    }
301}
302
303pub(super) fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk {
304    let (data_chunk, ops) = chunk.into_parts();
305    let (columns, vis) = data_chunk.into_parts();
306    let output_columns = indices
307        .iter()
308        .cloned()
309        .map(|idx| columns[idx].clone())
310        .collect();
311    StreamChunk::with_visibility(ops, output_columns, vis)
312}
313
314pub(super) mod phase1 {
315    use std::ops::Bound;
316
317    use futures::{StreamExt, pin_mut};
318    use futures_async_stream::try_stream;
319    use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
320    use risingwave_common::array::{Op, StreamChunk};
321    use risingwave_common::hash::{HashKey, NullBitmap};
322    use risingwave_common::row::{self, OwnedRow, Row, RowExt};
323    use risingwave_common::types::{DataType, DatumRef};
324    use risingwave_common::util::iter_util::ZipEqDebug;
325    use risingwave_hummock_sdk::HummockEpoch;
326    use risingwave_storage::StateStore;
327
328    use super::{StreamExecutorError, TemporalSide};
329    use crate::common::table::state_table::StateTable;
330    use crate::executor::monitor::TemporalJoinMetrics;
331
332    pub trait Phase1Evaluation {
333        /// Called when a matched row is found.
334        #[must_use = "consume chunk if produced"]
335        fn append_matched_row(
336            op: Op,
337            builder: &mut StreamChunkBuilder,
338            left_row: impl Row,
339            right_row: impl Row,
340        ) -> Option<StreamChunk>;
341
342        /// Called when all matched rows of a join key are appended.
343        #[must_use = "consume chunk if produced"]
344        fn match_end(
345            builder: &mut StreamChunkBuilder,
346            op: Op,
347            left_row: impl Row,
348            right_size: usize,
349            matched: bool,
350        ) -> Option<StreamChunk>;
351    }
352
353    pub struct Inner;
354    pub struct LeftOuter;
355    pub struct LeftOuterWithCond;
356
357    impl Phase1Evaluation for Inner {
358        fn append_matched_row(
359            op: Op,
360            builder: &mut StreamChunkBuilder,
361            left_row: impl Row,
362            right_row: impl Row,
363        ) -> Option<StreamChunk> {
364            builder.append_row(op, left_row.chain(right_row))
365        }
366
367        fn match_end(
368            _builder: &mut StreamChunkBuilder,
369            _op: Op,
370            _left_row: impl Row,
371            _right_size: usize,
372            _matched: bool,
373        ) -> Option<StreamChunk> {
374            None
375        }
376    }
377
378    impl Phase1Evaluation for LeftOuter {
379        fn append_matched_row(
380            op: Op,
381            builder: &mut StreamChunkBuilder,
382            left_row: impl Row,
383            right_row: impl Row,
384        ) -> Option<StreamChunk> {
385            builder.append_row(op, left_row.chain(right_row))
386        }
387
388        fn match_end(
389            builder: &mut StreamChunkBuilder,
390            op: Op,
391            left_row: impl Row,
392            right_size: usize,
393            matched: bool,
394        ) -> Option<StreamChunk> {
395            if !matched {
396                // If no rows matched, a marker row should be inserted.
397                builder.append_row(
398                    op,
399                    left_row.chain(row::repeat_n(DatumRef::None, right_size)),
400                )
401            } else {
402                None
403            }
404        }
405    }
406
407    impl Phase1Evaluation for LeftOuterWithCond {
408        fn append_matched_row(
409            op: Op,
410            builder: &mut StreamChunkBuilder,
411            left_row: impl Row,
412            right_row: impl Row,
413        ) -> Option<StreamChunk> {
414            builder.append_row(op, left_row.chain(right_row))
415        }
416
417        fn match_end(
418            builder: &mut StreamChunkBuilder,
419            op: Op,
420            left_row: impl Row,
421            right_size: usize,
422            _matched: bool,
423        ) -> Option<StreamChunk> {
424            // A marker row should always be inserted and mark as invisible for non-lookup filters evaluation.
425            // The row will be converted to visible in the further steps if no rows matched after all filters evaluated.
426            builder.append_row_invisible(
427                op,
428                left_row.chain(row::repeat_n(DatumRef::None, right_size)),
429            )
430        }
431    }
432
433    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
434    #[allow(clippy::too_many_arguments)]
435    pub(super) async fn handle_chunk<
436        'a,
437        K: HashKey,
438        S: StateStore,
439        E: Phase1Evaluation,
440        const APPEND_ONLY: bool,
441    >(
442        chunk_size: usize,
443        right_size: usize,
444        full_schema: Vec<DataType>,
445        epoch: HummockEpoch,
446        left_join_keys: &'a [usize],
447        right_table: &'a mut TemporalSide<K, S>,
448        memo_table_lookup_prefix: &'a [usize],
449        memo_table: &'a mut Option<StateTable<S>>,
450        null_matched: &'a K::Bitmap,
451        chunk: StreamChunk,
452        metrics: &'a TemporalJoinMetrics,
453    ) {
454        let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
455        let keys = K::build_many(left_join_keys, chunk.data_chunk());
456        let to_fetch_keys = chunk
457            .visibility()
458            .iter()
459            .zip_eq_debug(keys.iter())
460            .zip_eq_debug(chunk.ops())
461            .filter_map(|((vis, key), op)| {
462                if vis {
463                    if APPEND_ONLY {
464                        assert_eq!(*op, Op::Insert);
465                        Some(key)
466                    } else {
467                        match op {
468                            Op::Insert | Op::UpdateInsert => Some(key),
469                            Op::Delete | Op::UpdateDelete => None,
470                        }
471                    }
472                } else {
473                    None
474                }
475            });
476        right_table
477            .fetch_or_promote_keys(to_fetch_keys, epoch, metrics)
478            .await?;
479
480        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
481            let Some((op, left_row)) = r else {
482                continue;
483            };
484
485            let mut matched = false;
486
487            if APPEND_ONLY {
488                // Append-only temporal join
489                if key.null_bitmap().is_subset(null_matched)
490                    && let join_entry = right_table.force_peek(&key)
491                    && !join_entry.is_empty()
492                {
493                    matched = true;
494                    for right_row in join_entry.cached.values() {
495                        if let Some(chunk) =
496                            E::append_matched_row(op, &mut builder, left_row, right_row)
497                        {
498                            yield chunk;
499                        }
500                    }
501                }
502            } else {
503                // Non-append-only temporal join
504                // The memo-table pk and columns:
505                // (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`)
506                //
507                // Write pattern:
508                //   for each left input row (with insert op), construct the memo table pk and insert the row into the memo table.
509                // Read pattern:
510                //   for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table.
511                //
512                // Temporal join supports inner join and left outer join, additionally, it could contain other conditions.
513                // Surprisingly, we could handle them in a unified way with memo table.
514                // The memo table would persist rows fetched from the right table and appending the `join_key` and `left_pk` from the left row.
515                // The null rows generated by outer join and the other condition somehow is a stateless operation which means we can handle them without the memo table.
516                let memo_table = memo_table.as_mut().unwrap();
517                match op {
518                    Op::Insert | Op::UpdateInsert => {
519                        if key.null_bitmap().is_subset(null_matched)
520                            && let join_entry = right_table.force_peek(&key)
521                            && !join_entry.is_empty()
522                        {
523                            matched = true;
524                            for right_row in join_entry.cached.values() {
525                                let right_row: OwnedRow = right_row.clone();
526                                // Insert into memo table
527                                memo_table.insert(right_row.clone().chain(
528                                    left_row.project(memo_table_lookup_prefix).into_owned_row(),
529                                ));
530                                if let Some(chunk) = E::append_matched_row(
531                                    Op::Insert,
532                                    &mut builder,
533                                    left_row,
534                                    right_row,
535                                ) {
536                                    yield chunk;
537                                }
538                            }
539                        }
540                    }
541                    Op::Delete | Op::UpdateDelete => {
542                        let mut memo_rows_to_delete = vec![];
543                        if key.null_bitmap().is_subset(null_matched) {
544                            let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
545                                &(Bound::Unbounded, Bound::Unbounded);
546                            let prefix = left_row.project(memo_table_lookup_prefix);
547                            let state_table_iter = memo_table
548                                .iter_with_prefix(prefix, sub_range, Default::default())
549                                .await?;
550                            pin_mut!(state_table_iter);
551
552                            while let Some(memo_row) = state_table_iter.next().await {
553                                matched = true;
554                                let memo_row = memo_row?.into_owned_row();
555                                memo_rows_to_delete.push(memo_row.clone());
556                                if let Some(chunk) = E::append_matched_row(
557                                    Op::Delete,
558                                    &mut builder,
559                                    left_row,
560                                    memo_row.slice(0..right_size),
561                                ) {
562                                    yield chunk;
563                                }
564                            }
565                        }
566                        for memo_row in memo_rows_to_delete {
567                            // Delete from memo table
568                            memo_table.delete(memo_row);
569                        }
570                    }
571                }
572            }
573            if let Some(chunk) = E::match_end(
574                &mut builder,
575                match op {
576                    Op::Insert | Op::UpdateInsert => Op::Insert,
577                    Op::Delete | Op::UpdateDelete => Op::Delete,
578                },
579                left_row,
580                right_size,
581                matched,
582            ) {
583                yield chunk;
584            }
585        }
586
587        if let Some(chunk) = builder.take() {
588            yield chunk;
589        }
590    }
591}
592
593impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool>
594    TemporalJoinExecutor<K, S, T, APPEND_ONLY>
595{
596    #[allow(clippy::too_many_arguments)]
597    pub fn new(
598        ctx: ActorContextRef,
599        info: ExecutorInfo,
600        left: Executor,
601        right: Executor,
602        table: BatchTable<S>,
603        left_join_keys: Vec<usize>,
604        right_join_keys: Vec<usize>,
605        null_safe: Vec<bool>,
606        condition: Option<NonStrictExpression>,
607        output_indices: Vec<usize>,
608        table_output_indices: Vec<usize>,
609        table_stream_key_indices: Vec<usize>,
610        watermark_sequence: AtomicU64Ref,
611        metrics: Arc<StreamingMetrics>,
612        chunk_size: usize,
613        join_key_data_types: Vec<DataType>,
614        memo_table: Option<StateTable<S>>,
615    ) -> Self {
616        let alloc = StatsAlloc::new(Global).shared();
617
618        let metrics_info = MetricsInfo::new(
619            metrics.clone(),
620            table.table_id().table_id,
621            ctx.id,
622            "temporal join",
623        );
624
625        let cache = ManagedLruCache::unbounded_with_hasher_in(
626            watermark_sequence,
627            metrics_info,
628            DefaultHasher::default(),
629            alloc,
630        );
631
632        let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);
633
634        Self {
635            ctx: ctx.clone(),
636            info,
637            left,
638            right,
639            right_table: TemporalSide {
640                source: table,
641                table_stream_key_indices,
642                table_output_indices,
643                cache,
644                join_key_data_types,
645            },
646            left_join_keys,
647            right_join_keys,
648            null_safe,
649            condition,
650            output_indices,
651            chunk_size,
652            memo_table,
653            metrics,
654        }
655    }
656
657    #[try_stream(ok = Message, error = StreamExecutorError)]
658    async fn into_stream(mut self) {
659        let right_size = self.right.schema().len();
660
661        let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
662            &self.output_indices,
663            self.left.schema().len(),
664            right_size,
665        );
666
667        let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
668
669        let left_stream_key_indices = self.left.pk_indices().to_vec();
670        let right_stream_key_indices = self.right.pk_indices().to_vec();
671        let memo_table_lookup_prefix = self
672            .left_join_keys
673            .iter()
674            .cloned()
675            .chain(left_stream_key_indices)
676            .collect_vec();
677
678        let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
679
680        let mut prev_epoch = None;
681
682        let full_schema: Vec<_> = self
683            .left
684            .schema()
685            .data_types()
686            .into_iter()
687            .chain(self.right.schema().data_types().into_iter())
688            .collect();
689
690        let mut wait_first_barrier = true;
691
692        #[for_await]
693        for msg in align_input::<true>(self.left, self.right) {
694            self.right_table.cache.evict();
695            self.metrics
696                .temporal_join_cached_entry_count
697                .set(self.right_table.cache.len() as i64);
698            match msg? {
699                InternalMessage::WaterMark(watermark) => {
700                    let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
701                    yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
702                }
703                InternalMessage::Chunk(chunk) => {
704                    let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
705
706                    let full_schema = full_schema.clone();
707
708                    if T == JoinType::Inner {
709                        let st1 = phase1::handle_chunk::<K, S, phase1::Inner, APPEND_ONLY>(
710                            self.chunk_size,
711                            right_size,
712                            full_schema,
713                            epoch,
714                            &self.left_join_keys,
715                            &mut self.right_table,
716                            &memo_table_lookup_prefix,
717                            &mut self.memo_table,
718                            &null_matched,
719                            chunk,
720                            &self.metrics,
721                        );
722                        #[for_await]
723                        for chunk in st1 {
724                            let chunk = chunk?;
725                            let new_chunk = if let Some(ref cond) = self.condition {
726                                let (data_chunk, ops) = chunk.into_parts();
727                                let passed_bitmap = cond.eval_infallible(&data_chunk).await;
728                                let passed_bitmap =
729                                    Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
730                                let (columns, vis) = data_chunk.into_parts();
731                                let new_vis = vis & passed_bitmap;
732                                StreamChunk::with_visibility(ops, columns, new_vis)
733                            } else {
734                                chunk
735                            };
736                            let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
737                            yield Message::Chunk(new_chunk);
738                        }
739                    } else if let Some(ref cond) = self.condition {
740                        // Joined result without evaluating non-lookup conditions.
741                        let st1 =
742                            phase1::handle_chunk::<K, S, phase1::LeftOuterWithCond, APPEND_ONLY>(
743                                self.chunk_size,
744                                right_size,
745                                full_schema,
746                                epoch,
747                                &self.left_join_keys,
748                                &mut self.right_table,
749                                &memo_table_lookup_prefix,
750                                &mut self.memo_table,
751                                &null_matched,
752                                chunk,
753                                &self.metrics,
754                            );
755                        let mut matched_count = 0usize;
756                        #[for_await]
757                        for chunk in st1 {
758                            let chunk = chunk?;
759                            let (data_chunk, ops) = chunk.into_parts();
760                            let passed_bitmap = cond.eval_infallible(&data_chunk).await;
761                            let passed_bitmap =
762                                Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
763                            let (columns, vis) = data_chunk.into_parts();
764                            let mut new_vis = BitmapBuilder::with_capacity(vis.len());
765                            for (passed, not_match_end) in
766                                passed_bitmap.iter().zip_eq_debug(vis.iter())
767                            {
768                                let is_match_end = !not_match_end;
769                                let vis = if is_match_end && matched_count == 0 {
770                                    // Nothing is matched, so the marker row should be visible.
771                                    true
772                                } else if is_match_end {
773                                    // reset the count
774                                    matched_count = 0;
775                                    // rows found, so the marker row should be invisible.
776                                    false
777                                } else {
778                                    if passed {
779                                        matched_count += 1;
780                                    }
781                                    passed
782                                };
783                                new_vis.append(vis);
784                            }
785                            let new_chunk = apply_indices_map(
786                                StreamChunk::with_visibility(ops, columns, new_vis.finish()),
787                                &self.output_indices,
788                            );
789                            yield Message::Chunk(new_chunk);
790                        }
791                        // The last row should always be marker row,
792                        assert_eq!(matched_count, 0);
793                    } else {
794                        let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuter, APPEND_ONLY>(
795                            self.chunk_size,
796                            right_size,
797                            full_schema,
798                            epoch,
799                            &self.left_join_keys,
800                            &mut self.right_table,
801                            &memo_table_lookup_prefix,
802                            &mut self.memo_table,
803                            &null_matched,
804                            chunk,
805                            &self.metrics,
806                        );
807                        #[for_await]
808                        for chunk in st1 {
809                            let chunk = chunk?;
810                            let new_chunk = apply_indices_map(chunk, &self.output_indices);
811                            yield Message::Chunk(new_chunk);
812                        }
813                    }
814                }
815                InternalMessage::Barrier(updates, barrier) => {
816                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
817                    let barrier_epoch = barrier.epoch;
818                    if !APPEND_ONLY {
819                        if wait_first_barrier {
820                            wait_first_barrier = false;
821                            yield Message::Barrier(barrier);
822                            self.memo_table
823                                .as_mut()
824                                .unwrap()
825                                .init_epoch(barrier_epoch)
826                                .await?;
827                        } else {
828                            let post_commit = self
829                                .memo_table
830                                .as_mut()
831                                .unwrap()
832                                .commit(barrier.epoch)
833                                .await?;
834                            yield Message::Barrier(barrier);
835                            post_commit
836                                .post_yield_barrier(update_vnode_bitmap.clone())
837                                .await?;
838                        }
839                    } else {
840                        yield Message::Barrier(barrier);
841                    }
842                    if let Some(vnodes) = update_vnode_bitmap {
843                        let prev_vnodes =
844                            self.right_table.source.update_vnode_bitmap(vnodes.clone());
845                        if cache_may_stale(&prev_vnodes, &vnodes) {
846                            self.right_table.cache.clear();
847                        }
848                    }
849                    self.right_table.update(
850                        updates,
851                        &self.right_join_keys,
852                        &right_stream_key_indices,
853                    )?;
854                    prev_epoch = Some(barrier_epoch.prev);
855                }
856            }
857        }
858    }
859}
860
861impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool> Execute
862    for TemporalJoinExecutor<K, S, T, APPEND_ONLY>
863{
864    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
865        self.into_stream().boxed()
866    }
867}