risingwave_stream/executor/
temporal_join.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::ops::Bound;
18
19use anyhow::Context;
20use either::Either;
21use futures::TryStreamExt;
22use futures::stream::{self, PollNext};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{HashKey, NullBitmap};
27use risingwave_common::row::RowExt;
28use risingwave_common::util::iter_util::ZipEqDebug;
29use risingwave_common_estimate_size::{EstimateSize, KvSize};
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_storage::row_serde::value_serde::ValueRowSerde;
32use risingwave_storage::store::PrefetchOptions;
33
34use super::join::{JoinType, JoinTypePrimitive};
35use super::monitor::TemporalJoinMetrics;
36use crate::cache::ManagedLruCache;
37use crate::common::metrics::MetricsInfo;
38use crate::common::table::state_table::ReplicatedStateTable;
39use crate::executor::join::builder::JoinStreamChunkBuilder;
40use crate::executor::prelude::*;
41
42pub struct TemporalJoinExecutor<
43    K: HashKey,
44    S: StateStore,
45    SD: ValueRowSerde,
46    const T: JoinTypePrimitive,
47    const APPEND_ONLY: bool,
48> {
49    ctx: ActorContextRef,
50    #[expect(dead_code)]
51    info: ExecutorInfo,
52    left: Executor,
53    right: Executor,
54    right_table: TemporalSide<K, S, SD>,
55    left_join_keys: Vec<usize>,
56    right_join_keys: Vec<usize>,
57    null_safe: Vec<bool>,
58    condition: Option<NonStrictExpression>,
59    output_indices: Vec<usize>,
60    chunk_size: usize,
61    memo_table: Option<StateTable<S>>,
62    metrics: TemporalJoinMetrics,
63}
64
65#[derive(Default)]
66pub struct JoinEntry {
67    /// pk -> row
68    cached: HashMap<OwnedRow, OwnedRow>,
69    kv_heap_size: KvSize,
70}
71
72impl EstimateSize for JoinEntry {
73    fn estimated_heap_size(&self) -> usize {
74        // TODO: Add internal size.
75        // https://github.com/risingwavelabs/risingwave/issues/9713
76        self.kv_heap_size.size()
77    }
78}
79
80impl JoinEntry {
81    /// Insert into the cache.
82    pub fn insert(&mut self, key: OwnedRow, value: OwnedRow) {
83        // Lookup might refill the cache before the `insert` messages from the temporal side
84        // upstream.
85        if let Entry::Vacant(e) = self.cached.entry(key) {
86            self.kv_heap_size.add(e.key(), &value);
87            e.insert(value);
88        } else {
89            panic!("value {:?} double insert", value);
90        }
91    }
92
93    /// Delete from the cache.
94    pub fn remove(&mut self, key: &OwnedRow) {
95        if let Some(value) = self.cached.remove(key) {
96            self.kv_heap_size.sub(key, &value);
97        } else {
98            panic!("key {:?} should be in the cache", key);
99        }
100    }
101
102    pub fn is_empty(&self) -> bool {
103        self.cached.is_empty()
104    }
105}
106
107struct TemporalSide<K: HashKey, S: StateStore, SD: ValueRowSerde> {
108    source: ReplicatedStateTable<S, SD>,
109    table_stream_key_indices: Vec<usize>,
110    table_output_indices: Vec<usize>,
111    cache: ManagedLruCache<K, JoinEntry>,
112    join_key_data_types: Vec<DataType>,
113}
114
115impl<K: HashKey, S: StateStore, SD: ValueRowSerde> TemporalSide<K, S, SD> {
116    /// Fetch records from temporal side table and ensure the entry in the cache.
117    /// If already exists, the entry will be promoted.
118    async fn fetch_or_promote_keys(
119        &mut self,
120        keys: impl Iterator<Item = &K>,
121        metrics: &TemporalJoinMetrics,
122    ) -> StreamExecutorResult<()> {
123        let mut futs = Vec::with_capacity(keys.size_hint().1.unwrap_or(0));
124        for key in keys {
125            metrics.temporal_join_total_query_cache_count.inc();
126
127            if self.cache.get(key).is_none() {
128                metrics.temporal_join_cache_miss_count.inc();
129
130                futs.push(async {
131                    let pk_prefix = key.deserialize(&self.join_key_data_types)?;
132
133                    let iter = self
134                        .source
135                        .iter_with_prefix(
136                            &pk_prefix,
137                            &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
138                            PrefetchOptions::default(),
139                        )
140                        .await?;
141
142                    let mut entry = JoinEntry::default();
143
144                    pin_mut!(iter);
145                    while let Some(row) = iter.next().await {
146                        let row: OwnedRow = row?;
147                        entry.insert(
148                            row.as_ref()
149                                .project(&self.table_stream_key_indices)
150                                .into_owned_row(),
151                            row.project(&self.table_output_indices).into_owned_row(),
152                        );
153                    }
154                    let key = key.clone();
155                    Ok((key, entry)) as StreamExecutorResult<_>
156                });
157            }
158        }
159
160        #[for_await]
161        for res in stream::iter(futs).buffered(16) {
162            let (key, entry) = res?;
163            self.cache.put(key, entry);
164        }
165
166        Ok(())
167    }
168
169    fn force_peek(&self, key: &K) -> &JoinEntry {
170        self.cache.peek(key).expect("key should exists")
171    }
172
173    fn update(
174        &mut self,
175        chunks: Vec<StreamChunk>,
176        join_keys: &[usize],
177        right_stream_key_indices: &[usize],
178    ) -> StreamExecutorResult<()> {
179        for chunk in chunks {
180            let keys = K::build_many(join_keys, chunk.data_chunk());
181            for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
182                let Some((op, row)) = r else {
183                    continue;
184                };
185                if self.cache.contains(&key) {
186                    // Update cache
187                    let mut entry = self.cache.get_mut(&key).unwrap();
188                    let stream_key = row.project(right_stream_key_indices).into_owned_row();
189                    match op {
190                        Op::Insert | Op::UpdateInsert => {
191                            entry.insert(stream_key, row.into_owned_row())
192                        }
193                        Op::Delete | Op::UpdateDelete => entry.remove(&stream_key),
194                    };
195                }
196            }
197            self.source.write_chunk(chunk);
198        }
199        Ok(())
200    }
201}
202
203pub(super) enum InternalMessage {
204    Chunk(StreamChunk),
205    Barrier(Vec<StreamChunk>, Barrier),
206    WaterMark(Watermark),
207}
208
209#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
210async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
211    #[for_await]
212    for item in stream {
213        match item? {
214            Message::Watermark(_) => {
215                // ignore
216            }
217            Message::Chunk(c) => yield c,
218            Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
219                return Err(StreamExecutorError::align_barrier(expected_barrier, b));
220            }
221            Message::Barrier(_) => return Ok(()),
222        }
223    }
224}
225
226#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
227async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
228    #[for_await]
229    for item in stream {
230        match item? {
231            Message::Watermark(w) => {
232                yield InternalMessage::WaterMark(w);
233            }
234            Message::Chunk(c) => yield InternalMessage::Chunk(c),
235            Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
236                return Err(StreamExecutorError::align_barrier(expected_barrier, b));
237            }
238            Message::Barrier(_) => return Ok(()),
239        }
240    }
241}
242
243pub(super) async fn expect_first_barrier(
244    stream: &mut (impl Stream<Item = StreamExecutorResult<InternalMessage>> + Unpin),
245) -> StreamExecutorResult<Barrier> {
246    let InternalMessage::Barrier(updates, barrier) = stream
247        .try_next()
248        .instrument_await("expect_first_barrier")
249        .await?
250        .context("failed to extract the first message: stream closed unexpectedly")?
251    else {
252        unreachable!("unexpected internal message");
253    };
254    assert!(updates.is_empty());
255    Ok(barrier)
256}
257
258// Align the left and right inputs according to their barriers,
259// such that in the produced stream, an aligned interval starts with
260// any number of `InternalMessage::Chunk(left_chunk)` and followed by
261// `InternalMessage::Barrier(right_chunks, barrier)`.
262#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
263pub(super) async fn align_input<const YIELD_RIGHT_CHUNKS: bool>(left: Executor, right: Executor) {
264    let mut left = pin!(left.execute());
265    let mut right = pin!(right.execute());
266    // Keep producing intervals until stream exhaustion or errors.
267    loop {
268        let mut right_chunks = vec![];
269        // Produce an aligned interval.
270        'inner: loop {
271            let mut combined = stream::select_with_strategy(
272                left.by_ref().map(Either::Left),
273                right.by_ref().map(Either::Right),
274                |_: &mut ()| PollNext::Left,
275            );
276            match combined.next().await {
277                Some(Either::Left(Ok(Message::Chunk(c)))) => yield InternalMessage::Chunk(c),
278                Some(Either::Right(Ok(Message::Chunk(c)))) => {
279                    if YIELD_RIGHT_CHUNKS {
280                        right_chunks.push(c);
281                    }
282                }
283                Some(Either::Left(Ok(Message::Barrier(b)))) => {
284                    let mut remain = chunks_until_barrier(right.by_ref(), b.clone())
285                        .try_collect()
286                        .await?;
287                    if YIELD_RIGHT_CHUNKS {
288                        right_chunks.append(&mut remain);
289                    }
290                    yield InternalMessage::Barrier(right_chunks, b);
291                    break 'inner;
292                }
293                Some(Either::Right(Ok(Message::Barrier(b)))) => {
294                    #[for_await]
295                    for internal_message in
296                        internal_messages_until_barrier(left.by_ref(), b.clone())
297                    {
298                        yield internal_message?;
299                    }
300                    yield InternalMessage::Barrier(right_chunks, b);
301                    break 'inner;
302                }
303                Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e),
304                Some(Either::Left(Ok(Message::Watermark(w)))) => {
305                    yield InternalMessage::WaterMark(w);
306                }
307                Some(Either::Right(Ok(Message::Watermark(_)))) => {
308                    // ignore right side watermark
309                }
310                None => return Ok(()),
311            }
312        }
313    }
314}
315
316pub(super) fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk {
317    let (data_chunk, ops) = chunk.into_parts();
318    let (columns, vis) = data_chunk.into_parts();
319    let output_columns = indices
320        .iter()
321        .cloned()
322        .map(|idx| columns[idx].clone())
323        .collect();
324    StreamChunk::with_visibility(ops, output_columns, vis)
325}
326
327pub(super) mod phase1 {
328    use std::ops::Bound;
329
330    use futures::{StreamExt, pin_mut};
331    use futures_async_stream::try_stream;
332    use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
333    use risingwave_common::array::{Op, StreamChunk};
334    use risingwave_common::hash::{HashKey, NullBitmap};
335    use risingwave_common::row::{self, OwnedRow, Row, RowExt};
336    use risingwave_common::types::{DataType, DatumRef};
337    use risingwave_common::util::iter_util::ZipEqDebug;
338    use risingwave_storage::StateStore;
339    use risingwave_storage::row_serde::value_serde::ValueRowSerde;
340
341    use super::{StreamExecutorError, TemporalSide};
342    use crate::common::table::state_table::StateTable;
343    use crate::executor::monitor::TemporalJoinMetrics;
344
345    pub trait Phase1Evaluation {
346        /// Called when a matched row is found.
347        #[must_use = "consume chunk if produced"]
348        fn append_matched_row(
349            op: Op,
350            builder: &mut StreamChunkBuilder,
351            left_row: impl Row,
352            right_row: impl Row,
353        ) -> Option<StreamChunk>;
354
355        /// Called when all matched rows of a join key are appended.
356        #[must_use = "consume chunk if produced"]
357        fn match_end(
358            builder: &mut StreamChunkBuilder,
359            op: Op,
360            left_row: impl Row,
361            right_size: usize,
362            matched: bool,
363        ) -> Option<StreamChunk>;
364    }
365
366    pub struct Inner;
367    pub struct LeftOuter;
368    pub struct LeftOuterWithCond;
369
370    impl Phase1Evaluation for Inner {
371        fn append_matched_row(
372            op: Op,
373            builder: &mut StreamChunkBuilder,
374            left_row: impl Row,
375            right_row: impl Row,
376        ) -> Option<StreamChunk> {
377            builder.append_row(op, left_row.chain(right_row))
378        }
379
380        fn match_end(
381            _builder: &mut StreamChunkBuilder,
382            _op: Op,
383            _left_row: impl Row,
384            _right_size: usize,
385            _matched: bool,
386        ) -> Option<StreamChunk> {
387            None
388        }
389    }
390
391    impl Phase1Evaluation for LeftOuter {
392        fn append_matched_row(
393            op: Op,
394            builder: &mut StreamChunkBuilder,
395            left_row: impl Row,
396            right_row: impl Row,
397        ) -> Option<StreamChunk> {
398            builder.append_row(op, left_row.chain(right_row))
399        }
400
401        fn match_end(
402            builder: &mut StreamChunkBuilder,
403            op: Op,
404            left_row: impl Row,
405            right_size: usize,
406            matched: bool,
407        ) -> Option<StreamChunk> {
408            if !matched {
409                // If no rows matched, a marker row should be inserted.
410                builder.append_row(
411                    op,
412                    left_row.chain(row::repeat_n(DatumRef::None, right_size)),
413                )
414            } else {
415                None
416            }
417        }
418    }
419
420    impl Phase1Evaluation for LeftOuterWithCond {
421        fn append_matched_row(
422            op: Op,
423            builder: &mut StreamChunkBuilder,
424            left_row: impl Row,
425            right_row: impl Row,
426        ) -> Option<StreamChunk> {
427            builder.append_row(op, left_row.chain(right_row))
428        }
429
430        fn match_end(
431            builder: &mut StreamChunkBuilder,
432            op: Op,
433            left_row: impl Row,
434            right_size: usize,
435            _matched: bool,
436        ) -> Option<StreamChunk> {
437            // A marker row should always be inserted and mark as invisible for non-lookup filters evaluation.
438            // The row will be converted to visible in the further steps if no rows matched after all filters evaluated.
439            builder.append_row_invisible(
440                op,
441                left_row.chain(row::repeat_n(DatumRef::None, right_size)),
442            )
443        }
444    }
445
446    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
447    #[expect(clippy::too_many_arguments)]
448    pub(super) async fn handle_chunk<
449        'a,
450        K: HashKey,
451        S: StateStore,
452        SD: ValueRowSerde,
453        E: Phase1Evaluation,
454        const APPEND_ONLY: bool,
455    >(
456        chunk_size: usize,
457        right_size: usize,
458        full_schema: Vec<DataType>,
459        left_join_keys: &'a [usize],
460        right_table: &'a mut TemporalSide<K, S, SD>,
461        memo_table_lookup_prefix: &'a [usize],
462        memo_table: &'a mut Option<StateTable<S>>,
463        null_matched: &'a K::Bitmap,
464        chunk: StreamChunk,
465        metrics: &'a TemporalJoinMetrics,
466    ) {
467        let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
468        let keys = K::build_many(left_join_keys, chunk.data_chunk());
469        let to_fetch_keys = chunk
470            .visibility()
471            .iter()
472            .zip_eq_debug(keys.iter())
473            .zip_eq_debug(chunk.ops())
474            .filter_map(|((vis, key), op)| {
475                if vis {
476                    if APPEND_ONLY {
477                        assert_eq!(*op, Op::Insert);
478                        Some(key)
479                    } else {
480                        match op {
481                            Op::Insert | Op::UpdateInsert => Some(key),
482                            Op::Delete | Op::UpdateDelete => None,
483                        }
484                    }
485                } else {
486                    None
487                }
488            });
489        right_table
490            .fetch_or_promote_keys(to_fetch_keys, metrics)
491            .await?;
492
493        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
494            let Some((op, left_row)) = r else {
495                continue;
496            };
497
498            let mut matched = false;
499
500            if APPEND_ONLY {
501                // Append-only temporal join
502                if key.null_bitmap().is_subset(null_matched)
503                    && let join_entry = right_table.force_peek(&key)
504                    && !join_entry.is_empty()
505                {
506                    matched = true;
507                    for right_row in join_entry.cached.values() {
508                        if let Some(chunk) =
509                            E::append_matched_row(op, &mut builder, left_row, right_row)
510                        {
511                            yield chunk;
512                        }
513                    }
514                }
515            } else {
516                // Non-append-only temporal join
517                // The memo-table pk and columns:
518                // (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`)
519                //
520                // Write pattern:
521                //   for each left input row (with insert op), construct the memo table pk and insert the row into the memo table.
522                // Read pattern:
523                //   for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table.
524                //
525                // Temporal join supports inner join and left outer join, additionally, it could contain other conditions.
526                // Surprisingly, we could handle them in a unified way with memo table.
527                // The memo table would persist rows fetched from the right table and appending the `join_key` and `left_pk` from the left row.
528                // The null rows generated by outer join and the other condition somehow is a stateless operation which means we can handle them without the memo table.
529                let memo_table = memo_table.as_mut().unwrap();
530                match op {
531                    Op::Insert | Op::UpdateInsert => {
532                        if key.null_bitmap().is_subset(null_matched)
533                            && let join_entry = right_table.force_peek(&key)
534                            && !join_entry.is_empty()
535                        {
536                            matched = true;
537                            for right_row in join_entry.cached.values() {
538                                let right_row: OwnedRow = right_row.clone();
539                                // Insert into memo table
540                                memo_table.insert(right_row.clone().chain(
541                                    left_row.project(memo_table_lookup_prefix).into_owned_row(),
542                                ));
543                                if let Some(chunk) = E::append_matched_row(
544                                    Op::Insert,
545                                    &mut builder,
546                                    left_row,
547                                    right_row,
548                                ) {
549                                    yield chunk;
550                                }
551                            }
552                        }
553                    }
554                    Op::Delete | Op::UpdateDelete => {
555                        let mut memo_rows_to_delete = vec![];
556                        if key.null_bitmap().is_subset(null_matched) {
557                            let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
558                                &(Bound::Unbounded, Bound::Unbounded);
559                            let prefix = left_row.project(memo_table_lookup_prefix);
560                            let state_table_iter = memo_table
561                                .iter_with_prefix(prefix, sub_range, Default::default())
562                                .await?;
563                            pin_mut!(state_table_iter);
564
565                            while let Some(memo_row) = state_table_iter.next().await {
566                                matched = true;
567                                let memo_row = memo_row?.into_owned_row();
568                                memo_rows_to_delete.push(memo_row.clone());
569                                if let Some(chunk) = E::append_matched_row(
570                                    Op::Delete,
571                                    &mut builder,
572                                    left_row,
573                                    memo_row.slice(0..right_size),
574                                ) {
575                                    yield chunk;
576                                }
577                            }
578                        }
579                        for memo_row in memo_rows_to_delete {
580                            // Delete from memo table
581                            memo_table.delete(memo_row);
582                        }
583                    }
584                }
585            }
586            if let Some(chunk) = E::match_end(
587                &mut builder,
588                match op {
589                    Op::Insert | Op::UpdateInsert => Op::Insert,
590                    Op::Delete | Op::UpdateDelete => Op::Delete,
591                },
592                left_row,
593                right_size,
594                matched,
595            ) {
596                yield chunk;
597            }
598        }
599
600        if let Some(chunk) = builder.take() {
601            yield chunk;
602        }
603    }
604}
605
606impl<
607    K: HashKey,
608    S: StateStore,
609    SD: ValueRowSerde,
610    const T: JoinTypePrimitive,
611    const APPEND_ONLY: bool,
612> TemporalJoinExecutor<K, S, SD, T, APPEND_ONLY>
613{
614    #[expect(clippy::too_many_arguments)]
615    pub fn new(
616        ctx: ActorContextRef,
617        info: ExecutorInfo,
618        left: Executor,
619        right: Executor,
620        table: ReplicatedStateTable<S, SD>,
621        left_join_keys: Vec<usize>,
622        right_join_keys: Vec<usize>,
623        null_safe: Vec<bool>,
624        condition: Option<NonStrictExpression>,
625        output_indices: Vec<usize>,
626        table_output_indices: Vec<usize>,
627        table_stream_key_indices: Vec<usize>,
628        watermark_sequence: AtomicU64Ref,
629        metrics: Arc<StreamingMetrics>,
630        chunk_size: usize,
631        join_key_data_types: Vec<DataType>,
632        memo_table: Option<StateTable<S>>,
633    ) -> Self {
634        let metrics_info =
635            MetricsInfo::new(metrics.clone(), table.table_id(), ctx.id, "temporal join");
636
637        let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info);
638
639        let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);
640
641        Self {
642            ctx,
643            info,
644            left,
645            right,
646            right_table: TemporalSide {
647                source: table,
648                table_stream_key_indices,
649                table_output_indices,
650                cache,
651                join_key_data_types,
652            },
653            left_join_keys,
654            right_join_keys,
655            null_safe,
656            condition,
657            output_indices,
658            chunk_size,
659            memo_table,
660            metrics,
661        }
662    }
663
664    #[try_stream(ok = Message, error = StreamExecutorError)]
665    async fn into_stream(mut self) {
666        let right_size = self.right.schema().len();
667
668        let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
669            &self.output_indices,
670            self.left.schema().len(),
671            right_size,
672        );
673
674        let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
675
676        let left_stream_key_indices = self.left.stream_key().to_vec();
677        let right_stream_key_indices = self.right.stream_key().to_vec();
678        let memo_table_lookup_prefix = self
679            .left_join_keys
680            .iter()
681            .cloned()
682            .chain(left_stream_key_indices)
683            .collect_vec();
684
685        let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
686
687        let full_schema: Vec<_> = self
688            .left
689            .schema()
690            .data_types()
691            .into_iter()
692            .chain(self.right.schema().data_types().into_iter())
693            .collect();
694
695        let input = align_input::<true>(self.left, self.right);
696        pin_mut!(input);
697        let barrier = expect_first_barrier(&mut input).await?;
698        let barrier_epoch = barrier.epoch;
699        yield Message::Barrier(barrier);
700        self.right_table.source.init_epoch(barrier_epoch).await?;
701        if !APPEND_ONLY {
702            self.memo_table
703                .as_mut()
704                .unwrap()
705                .init_epoch(barrier_epoch)
706                .await?;
707        }
708
709        #[for_await]
710        for msg in input {
711            self.right_table.cache.evict();
712            self.metrics
713                .temporal_join_cached_entry_count
714                .set(self.right_table.cache.len() as i64);
715            match msg? {
716                InternalMessage::WaterMark(watermark) => {
717                    let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
718                    yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
719                }
720                InternalMessage::Chunk(chunk) => {
721                    let full_schema = full_schema.clone();
722
723                    if T == JoinType::Inner {
724                        let st1 = phase1::handle_chunk::<K, S, SD, phase1::Inner, APPEND_ONLY>(
725                            self.chunk_size,
726                            right_size,
727                            full_schema,
728                            &self.left_join_keys,
729                            &mut self.right_table,
730                            &memo_table_lookup_prefix,
731                            &mut self.memo_table,
732                            &null_matched,
733                            chunk,
734                            &self.metrics,
735                        );
736                        #[for_await]
737                        for chunk in st1 {
738                            let chunk = chunk?;
739                            let new_chunk = if let Some(ref cond) = self.condition {
740                                let (data_chunk, ops) = chunk.into_parts();
741                                let passed_bitmap = cond.eval_infallible(&data_chunk).await;
742                                let passed_bitmap =
743                                    Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
744                                let (columns, vis) = data_chunk.into_parts();
745                                let new_vis = vis & passed_bitmap;
746                                StreamChunk::with_visibility(ops, columns, new_vis)
747                            } else {
748                                chunk
749                            };
750                            let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
751                            yield Message::Chunk(new_chunk);
752                        }
753                    } else if let Some(ref cond) = self.condition {
754                        // Joined result without evaluating non-lookup conditions.
755                        let st1 = phase1::handle_chunk::<
756                            K,
757                            S,
758                            SD,
759                            phase1::LeftOuterWithCond,
760                            APPEND_ONLY,
761                        >(
762                            self.chunk_size,
763                            right_size,
764                            full_schema,
765                            &self.left_join_keys,
766                            &mut self.right_table,
767                            &memo_table_lookup_prefix,
768                            &mut self.memo_table,
769                            &null_matched,
770                            chunk,
771                            &self.metrics,
772                        );
773                        let mut matched_count = 0usize;
774                        #[for_await]
775                        for chunk in st1 {
776                            let chunk = chunk?;
777                            let (data_chunk, ops) = chunk.into_parts();
778                            let passed_bitmap = cond.eval_infallible(&data_chunk).await;
779                            let passed_bitmap =
780                                Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
781                            let (columns, vis) = data_chunk.into_parts();
782                            let mut new_vis = BitmapBuilder::with_capacity(vis.len());
783                            for (passed, not_match_end) in
784                                passed_bitmap.iter().zip_eq_debug(vis.iter())
785                            {
786                                let is_match_end = !not_match_end;
787                                let vis = if is_match_end && matched_count == 0 {
788                                    // Nothing is matched, so the marker row should be visible.
789                                    true
790                                } else if is_match_end {
791                                    // reset the count
792                                    matched_count = 0;
793                                    // rows found, so the marker row should be invisible.
794                                    false
795                                } else {
796                                    if passed {
797                                        matched_count += 1;
798                                    }
799                                    passed
800                                };
801                                new_vis.append(vis);
802                            }
803                            let new_chunk = apply_indices_map(
804                                StreamChunk::with_visibility(ops, columns, new_vis.finish()),
805                                &self.output_indices,
806                            );
807                            yield Message::Chunk(new_chunk);
808                        }
809                        // The last row should always be marker row,
810                        assert_eq!(matched_count, 0);
811                    } else {
812                        let st1 = phase1::handle_chunk::<K, S, SD, phase1::LeftOuter, APPEND_ONLY>(
813                            self.chunk_size,
814                            right_size,
815                            full_schema,
816                            &self.left_join_keys,
817                            &mut self.right_table,
818                            &memo_table_lookup_prefix,
819                            &mut self.memo_table,
820                            &null_matched,
821                            chunk,
822                            &self.metrics,
823                        );
824                        #[for_await]
825                        for chunk in st1 {
826                            let chunk = chunk?;
827                            let new_chunk = apply_indices_map(chunk, &self.output_indices);
828                            yield Message::Chunk(new_chunk);
829                        }
830                    }
831                }
832                InternalMessage::Barrier(updates, barrier) => {
833                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
834
835                    // Write right-side chunks to the replicated state table and update LRU cache.
836                    // Must happen before commit.
837                    self.right_table.update(
838                        updates,
839                        &self.right_join_keys,
840                        &right_stream_key_indices,
841                    )?;
842                    let right_post_commit = self.right_table.source.commit(barrier.epoch).await?;
843                    let memo_post_commit = if !APPEND_ONLY {
844                        Some(
845                            self.memo_table
846                                .as_mut()
847                                .unwrap()
848                                .commit(barrier.epoch)
849                                .await?,
850                        )
851                    } else {
852                        None
853                    };
854
855                    yield Message::Barrier(barrier);
856
857                    if let Some((_, true)) = right_post_commit
858                        .post_yield_barrier(update_vnode_bitmap.clone())
859                        .await?
860                    {
861                        self.right_table.cache.clear();
862                    }
863                    if let Some(memo_post_commit) = memo_post_commit {
864                        memo_post_commit
865                            .post_yield_barrier(update_vnode_bitmap.clone())
866                            .await?;
867                    }
868                }
869            }
870        }
871    }
872}
873
874impl<
875    K: HashKey,
876    S: StateStore,
877    SD: ValueRowSerde,
878    const T: JoinTypePrimitive,
879    const APPEND_ONLY: bool,
880> Execute for TemporalJoinExecutor<K, S, SD, T, APPEND_ONLY>
881{
882    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
883        self.into_stream().boxed()
884    }
885}
886
887#[cfg(test)]
888mod tests {
889    use std::collections::HashSet;
890    use std::sync::Arc;
891    use std::sync::atomic::AtomicU64;
892
893    use risingwave_common::array::*;
894    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
895    use risingwave_common::hash::Key32;
896    use risingwave_common::types::{DataType, ScalarRefImpl};
897    use risingwave_common::util::epoch::{EpochPair, test_epoch};
898    use risingwave_common::util::sort_util::OrderType;
899    use risingwave_common::util::value_encoding::BasicSerde;
900    use risingwave_hummock_test::test_utils::prepare_hummock_test_env;
901    use risingwave_storage::hummock::HummockStorage;
902
903    use super::*;
904    use crate::common::table::state_table::{
905        StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
906    };
907    use crate::common::table::test_utils::gen_pbtable;
908    use crate::executor::monitor::StreamingMetrics;
909    use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
910    use crate::executor::{ActorContext, ExecutorInfo, JoinType};
911
912    /// Tests that a temporal join on a pk-prefix (join key is a strict prefix of the right table's
913    /// pk) correctly merges rows committed in epoch1 with rows staged/committed during epoch2, and
914    /// produces the right results when the left side arrives in epoch3.
915    ///
916    /// Right table: (key INT, seq INT, val INT), pk = (key, seq), SINGLETON distribution.
917    /// Join condition: `left.left_key` = right.key  (pk prefix: join uses only the first pk column).
918    ///
919    /// Pre-commit at epoch1: (1,1,100), (1,2,200), (2,1,300)
920    /// Epoch2:  right side sends insert (3,1,400) — written to replicated state table, committed
921    ///          at the epoch2 barrier.
922    /// Epoch3:  left side sends (`left_key=1`, `left_val=111`) and (`left_key=3`, `left_val=333`).
923    ///
924    /// Expected join output in epoch3:
925    ///   (1, 111, 1, 1, 100)  — key=1 row matched from epoch1 data (cache miss → state store read)
926    ///   (1, 111, 1, 2, 200)  — key=1 row matched from epoch1 data (same cache entry)
927    ///   (3, 333, 3, 1, 400)  — key=3 row matched from epoch2 data (cache miss → state store read)
928    #[tokio::test]
929    async fn test_temporal_join_pk_prefix_staging_merge() {
930        let test_env = prepare_hummock_test_env().await;
931        let table_id = TableId::new(1);
932
933        // Right table schema: (key INT col_id=1, seq INT col_id=2, val INT col_id=3)
934        // pk = (key idx=0, seq idx=1), SINGLETON distribution (empty distribution_key),
935        // read_prefix_len_hint = 2 (= full pk length).
936        let right_col_descs = vec![
937            ColumnDesc::unnamed(ColumnId::new(1), DataType::Int32),
938            ColumnDesc::unnamed(ColumnId::new(2), DataType::Int32),
939            ColumnDesc::unnamed(ColumnId::new(3), DataType::Int32),
940        ];
941        let order_types = vec![OrderType::ascending(), OrderType::ascending()];
942        let pk_indices = vec![0usize, 1];
943        let pbtable = gen_pbtable(table_id, right_col_descs, order_types, pk_indices, 2);
944
945        test_env.register_table(pbtable.clone()).await;
946
947        // Pre-commit epoch1 data via a plain StateTable (bypasses the executor).
948        {
949            let mut setup_table = StateTable::<HummockStorage>::from_table_catalog_inconsistent_op(
950                &pbtable,
951                test_env.storage.clone(),
952                None,
953            )
954            .await;
955            test_env
956                .storage
957                .start_epoch(test_epoch(1), HashSet::from_iter([table_id]));
958            setup_table
959                .init_epoch(EpochPair::new_test_epoch(test_epoch(1)))
960                .await
961                .unwrap();
962            setup_table.insert(OwnedRow::new(vec![
963                Some(1i32.into()),
964                Some(1i32.into()),
965                Some(100i32.into()),
966            ]));
967            setup_table.insert(OwnedRow::new(vec![
968                Some(1i32.into()),
969                Some(2i32.into()),
970                Some(200i32.into()),
971            ]));
972            setup_table.insert(OwnedRow::new(vec![
973                Some(2i32.into()),
974                Some(1i32.into()),
975                Some(300i32.into()),
976            ]));
977            test_env
978                .storage
979                .start_epoch(test_epoch(2), HashSet::from_iter([table_id]));
980            setup_table
981                .commit_for_test(EpochPair::new_test_epoch(test_epoch(2)))
982                .await
983                .unwrap();
984            // Seal and commit epoch1 data in Hummock so it is visible to all readers.
985            test_env.commit_epoch(test_epoch(1)).await;
986        }
987
988        // Build the replicated state table for the executor (all 3 columns are output).
989        let output_column_ids = vec![ColumnId::new(1), ColumnId::new(2), ColumnId::new(3)];
990        let right_table = StateTableBuilder::<_, BasicSerde, true, _>::new(
991            &pbtable,
992            test_env.storage.clone(),
993            None,
994        )
995        .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
996        .with_output_column_ids(output_column_ids)
997        .forbid_preload_all_rows()
998        .build()
999        .await;
1000
1001        // Left source: (left_key INT, left_val INT), stream_key = [0].
1002        let left_schema = Schema::new(vec![
1003            Field::unnamed(DataType::Int32),
1004            Field::unnamed(DataType::Int32),
1005        ]);
1006        let (mut left_tx, left_source) = MockSource::channel();
1007        let left_executor = left_source.into_executor(left_schema.clone(), vec![0]);
1008
1009        // Right source: mirrors the right table columns (key INT, seq INT, val INT),
1010        // stream_key = [0, 1] (the pk).
1011        let right_schema = Schema::new(vec![
1012            Field::unnamed(DataType::Int32),
1013            Field::unnamed(DataType::Int32),
1014            Field::unnamed(DataType::Int32),
1015        ]);
1016        let (mut right_tx, right_source) = MockSource::channel();
1017        let right_executor = right_source.into_executor(right_schema.clone(), vec![0, 1]);
1018
1019        // table_output_indices: indices in right table output rows that form the output.
1020        // All 3 columns are selected.
1021        let table_output_indices = vec![0usize, 1, 2];
1022        // table_stream_key_indices: pk of right table within the output row = [0, 1] (key, seq).
1023        let table_stream_key_indices = vec![0usize, 1];
1024
1025        // Join on left.left_key (col 0) = right.key (col 0 in right source).
1026        let left_join_keys = vec![0usize];
1027        let right_join_keys = vec![0usize];
1028        let null_safe = vec![false];
1029        let join_key_data_types = vec![DataType::Int32];
1030
1031        // Output: all 5 columns — [left_key, left_val, key, seq, val].
1032        let output_indices = vec![0usize, 1, 2, 3, 4];
1033        let output_schema = Schema::new(vec![
1034            Field::unnamed(DataType::Int32),
1035            Field::unnamed(DataType::Int32),
1036            Field::unnamed(DataType::Int32),
1037            Field::unnamed(DataType::Int32),
1038            Field::unnamed(DataType::Int32),
1039        ]);
1040        let info = ExecutorInfo::for_test(output_schema, vec![], "TemporalJoinTest".to_owned(), 0);
1041
1042        let executor = TemporalJoinExecutor::<
1043            Key32,
1044            HummockStorage,
1045            BasicSerde,
1046            { JoinType::Inner },
1047            true,
1048        >::new(
1049            ActorContext::for_test(0),
1050            info.clone(),
1051            left_executor,
1052            right_executor,
1053            right_table,
1054            left_join_keys,
1055            right_join_keys,
1056            null_safe,
1057            None, // no extra non-equi condition
1058            output_indices,
1059            table_output_indices,
1060            table_stream_key_indices,
1061            Arc::new(AtomicU64::new(0)),
1062            Arc::new(StreamingMetrics::unused()),
1063            1024,
1064            join_key_data_types,
1065            None, // no memo table (append-only inner join)
1066        );
1067
1068        let mut stream = Box::new(executor).execute();
1069
1070        // Push the first barrier (init epoch2, prev = epoch1) on both sides.
1071        left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(2), test_epoch(1), false);
1072        right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(2), test_epoch(1), false);
1073        stream.expect_barrier().await;
1074
1075        // Epoch2: right side inserts (3, 1, 400); left side is quiet.
1076        // The epoch2→epoch3 barrier will trigger write_chunk + commit for the right table,
1077        // making (3,1,400) visible as committed epoch2 data.
1078        right_tx.push_chunk(StreamChunk::from_pretty(
1079            " i i   i
1080            + 3 1 400",
1081        ));
1082        // Start epoch3 before the barrier that commits epoch2 data.
1083        test_env
1084            .storage
1085            .start_epoch(test_epoch(3), HashSet::from_iter([table_id]));
1086        left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(3), test_epoch(2), false);
1087        right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(3), test_epoch(2), false);
1088        stream.expect_barrier().await;
1089
1090        // Start epoch4 before the stop barrier that commits epoch3 data.
1091        test_env
1092            .storage
1093            .start_epoch(test_epoch(4), HashSet::from_iter([table_id]));
1094
1095        // Epoch3: left side sends two rows.
1096        //   key=1 → cache miss → state store read → finds (1,1,100) and (1,2,200) from epoch1.
1097        //   key=3 → cache miss → state store read → finds (3,1,400) from epoch2.
1098        left_tx.push_chunk(StreamChunk::from_pretty(
1099            " i   i
1100            + 1 111
1101            + 3 333",
1102        ));
1103        left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(4), test_epoch(3), true);
1104        right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(4), test_epoch(3), true);
1105
1106        // Collect all output rows before the stop barrier.
1107        let mut output_rows: Vec<[i32; 5]> = vec![];
1108        loop {
1109            match stream.next().await.unwrap().unwrap() {
1110                Message::Chunk(chunk) => {
1111                    for (op, row) in chunk.rows() {
1112                        assert_eq!(op, Op::Insert);
1113                        let row: [i32; 5] =
1114                            std::array::from_fn(|i| match row.datum_at(i).unwrap() {
1115                                ScalarRefImpl::Int32(v) => v,
1116                                _ => panic!("expected Int32"),
1117                            });
1118                        output_rows.push(row);
1119                    }
1120                }
1121                Message::Barrier(_) => break,
1122                _ => {}
1123            }
1124        }
1125
1126        output_rows.sort();
1127        assert_eq!(
1128            output_rows,
1129            vec![
1130                [1, 111, 1, 1, 100], // key=1 matched epoch1 row (1,1,100)
1131                [1, 111, 1, 2, 200], // key=1 matched epoch1 row (1,2,200)
1132                [3, 333, 3, 1, 400], // key=3 matched epoch2 row (3,1,400)
1133            ]
1134        );
1135    }
1136}