risingwave_stream/executor/
temporal_join.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::ops::Bound;
18
19use anyhow::Context;
20use either::Either;
21use futures::TryStreamExt;
22use futures::stream::{self, PollNext};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{HashKey, NullBitmap};
27use risingwave_common::row::RowExt;
28use risingwave_common::util::iter_util::ZipEqDebug;
29use risingwave_common_estimate_size::{EstimateSize, KvSize};
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_storage::row_serde::value_serde::ValueRowSerde;
32use risingwave_storage::store::PrefetchOptions;
33
34use super::join::{JoinType, JoinTypePrimitive};
35use super::monitor::TemporalJoinMetrics;
36use crate::cache::ManagedLruCache;
37use crate::common::metrics::MetricsInfo;
38use crate::common::table::state_table::ReplicatedStateTable;
39use crate::executor::join::builder::JoinStreamChunkBuilder;
40use crate::executor::prelude::*;
41
42pub struct TemporalJoinExecutor<
43    K: HashKey,
44    S: StateStore,
45    SD: ValueRowSerde,
46    const T: JoinTypePrimitive,
47    const APPEND_ONLY: bool,
48> {
49    ctx: ActorContextRef,
50    #[expect(dead_code)]
51    info: ExecutorInfo,
52    left: Executor,
53    right: Executor,
54    right_table: TemporalSide<K, S, SD>,
55    left_join_keys: Vec<usize>,
56    right_join_keys: Vec<usize>,
57    null_safe: Vec<bool>,
58    condition: Option<NonStrictExpression>,
59    output_indices: Vec<usize>,
60    chunk_size: usize,
61    memo_table: Option<StateTable<S>>,
62    metrics: TemporalJoinMetrics,
63}
64
65#[derive(Default)]
66pub struct JoinEntry {
67    /// pk -> row
68    cached: HashMap<OwnedRow, OwnedRow>,
69    kv_heap_size: KvSize,
70}
71
72impl EstimateSize for JoinEntry {
73    fn estimated_heap_size(&self) -> usize {
74        // TODO: Add internal size.
75        // https://github.com/risingwavelabs/risingwave/issues/9713
76        self.kv_heap_size.size()
77    }
78}
79
80impl JoinEntry {
81    /// Insert into the cache.
82    pub fn insert(&mut self, key: OwnedRow, value: OwnedRow) {
83        // Lookup might refill the cache before the `insert` messages from the temporal side
84        // upstream.
85        if let Entry::Vacant(e) = self.cached.entry(key) {
86            self.kv_heap_size.add(e.key(), &value);
87            e.insert(value);
88        } else {
89            panic!("value {:?} double insert", value);
90        }
91    }
92
93    /// Delete from the cache.
94    pub fn remove(&mut self, key: &OwnedRow) {
95        if let Some(value) = self.cached.remove(key) {
96            self.kv_heap_size.sub(key, &value);
97        } else {
98            panic!("key {:?} should be in the cache", key);
99        }
100    }
101
102    pub fn is_empty(&self) -> bool {
103        self.cached.is_empty()
104    }
105}
106
107struct TemporalSide<K: HashKey, S: StateStore, SD: ValueRowSerde> {
108    source: ReplicatedStateTable<S, SD>,
109    table_stream_key_indices: Vec<usize>,
110    cache: ManagedLruCache<K, JoinEntry>,
111    join_key_data_types: Vec<DataType>,
112}
113
114impl<K: HashKey, S: StateStore, SD: ValueRowSerde> TemporalSide<K, S, SD> {
115    /// Fetch records from temporal side table and ensure the entry in the cache.
116    /// If already exists, the entry will be promoted.
117    async fn fetch_or_promote_keys(
118        &mut self,
119        keys: impl Iterator<Item = &K>,
120        metrics: &TemporalJoinMetrics,
121    ) -> StreamExecutorResult<()> {
122        let mut futs = Vec::with_capacity(keys.size_hint().1.unwrap_or(0));
123        for key in keys {
124            metrics.temporal_join_total_query_cache_count.inc();
125
126            if self.cache.get(key).is_none() {
127                metrics.temporal_join_cache_miss_count.inc();
128
129                futs.push(async {
130                    let pk_prefix = key.deserialize(&self.join_key_data_types)?;
131
132                    let iter = self
133                        .source
134                        .iter_with_prefix(
135                            &pk_prefix,
136                            &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
137                            PrefetchOptions::default(),
138                        )
139                        .await?;
140
141                    let mut entry = JoinEntry::default();
142
143                    pin_mut!(iter);
144                    while let Some(row) = iter.next().await {
145                        let row: OwnedRow = row?;
146                        entry.insert(
147                            row.as_ref()
148                                .project(&self.table_stream_key_indices)
149                                .into_owned_row(),
150                            row,
151                        );
152                    }
153                    let key = key.clone();
154                    Ok((key, entry)) as StreamExecutorResult<_>
155                });
156            }
157        }
158
159        #[for_await]
160        for res in stream::iter(futs).buffered(16) {
161            let (key, entry) = res?;
162            self.cache.put(key, entry);
163        }
164
165        Ok(())
166    }
167
168    fn force_peek(&self, key: &K) -> &JoinEntry {
169        self.cache.peek(key).expect("key should exists")
170    }
171
172    fn update(
173        &mut self,
174        chunks: Vec<StreamChunk>,
175        join_keys: &[usize],
176        right_stream_key_indices: &[usize],
177    ) -> StreamExecutorResult<()> {
178        for chunk in chunks {
179            let keys = K::build_many(join_keys, chunk.data_chunk());
180            for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
181                let Some((op, row)) = r else {
182                    continue;
183                };
184                if self.cache.contains(&key) {
185                    // Update cache
186                    let mut entry = self.cache.get_mut(&key).unwrap();
187                    let stream_key = row.project(right_stream_key_indices).into_owned_row();
188                    match op {
189                        Op::Insert | Op::UpdateInsert => {
190                            entry.insert(stream_key, row.into_owned_row())
191                        }
192                        Op::Delete | Op::UpdateDelete => entry.remove(&stream_key),
193                    };
194                }
195            }
196            self.source.write_chunk(chunk);
197        }
198        Ok(())
199    }
200}
201
202pub(super) enum InternalMessage {
203    Chunk(StreamChunk),
204    Barrier(Vec<StreamChunk>, Barrier),
205    WaterMark(Watermark),
206}
207
208#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
209async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
210    #[for_await]
211    for item in stream {
212        match item? {
213            Message::Watermark(_) => {
214                // ignore
215            }
216            Message::Chunk(c) => yield c,
217            Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
218                return Err(StreamExecutorError::align_barrier(expected_barrier, b));
219            }
220            Message::Barrier(_) => return Ok(()),
221        }
222    }
223}
224
225#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
226async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) {
227    #[for_await]
228    for item in stream {
229        match item? {
230            Message::Watermark(w) => {
231                yield InternalMessage::WaterMark(w);
232            }
233            Message::Chunk(c) => yield InternalMessage::Chunk(c),
234            Message::Barrier(b) if b.epoch != expected_barrier.epoch => {
235                return Err(StreamExecutorError::align_barrier(expected_barrier, b));
236            }
237            Message::Barrier(_) => return Ok(()),
238        }
239    }
240}
241
242pub(super) async fn expect_first_barrier(
243    stream: &mut (impl Stream<Item = StreamExecutorResult<InternalMessage>> + Unpin),
244) -> StreamExecutorResult<Barrier> {
245    let InternalMessage::Barrier(updates, barrier) = stream
246        .try_next()
247        .instrument_await("expect_first_barrier")
248        .await?
249        .context("failed to extract the first message: stream closed unexpectedly")?
250    else {
251        unreachable!("unexpected internal message");
252    };
253    assert!(updates.is_empty());
254    Ok(barrier)
255}
256
257// Align the left and right inputs according to their barriers,
258// such that in the produced stream, an aligned interval starts with
259// any number of `InternalMessage::Chunk(left_chunk)` and followed by
260// `InternalMessage::Barrier(right_chunks, barrier)`.
261#[try_stream(ok = InternalMessage, error = StreamExecutorError)]
262pub(super) async fn align_input<const YIELD_RIGHT_CHUNKS: bool>(left: Executor, right: Executor) {
263    let mut left = pin!(left.execute());
264    let mut right = pin!(right.execute());
265    // Keep producing intervals until stream exhaustion or errors.
266    loop {
267        let mut right_chunks = vec![];
268        // Produce an aligned interval.
269        'inner: loop {
270            let mut combined = stream::select_with_strategy(
271                left.by_ref().map(Either::Left),
272                right.by_ref().map(Either::Right),
273                |_: &mut ()| PollNext::Left,
274            );
275            match combined.next().await {
276                Some(Either::Left(Ok(Message::Chunk(c)))) => yield InternalMessage::Chunk(c),
277                Some(Either::Right(Ok(Message::Chunk(c)))) => {
278                    if YIELD_RIGHT_CHUNKS {
279                        right_chunks.push(c);
280                    }
281                }
282                Some(Either::Left(Ok(Message::Barrier(b)))) => {
283                    let mut remain = chunks_until_barrier(right.by_ref(), b.clone())
284                        .try_collect()
285                        .await?;
286                    if YIELD_RIGHT_CHUNKS {
287                        right_chunks.append(&mut remain);
288                    }
289                    yield InternalMessage::Barrier(right_chunks, b);
290                    break 'inner;
291                }
292                Some(Either::Right(Ok(Message::Barrier(b)))) => {
293                    #[for_await]
294                    for internal_message in
295                        internal_messages_until_barrier(left.by_ref(), b.clone())
296                    {
297                        yield internal_message?;
298                    }
299                    yield InternalMessage::Barrier(right_chunks, b);
300                    break 'inner;
301                }
302                Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e),
303                Some(Either::Left(Ok(Message::Watermark(w)))) => {
304                    yield InternalMessage::WaterMark(w);
305                }
306                Some(Either::Right(Ok(Message::Watermark(_)))) => {
307                    // ignore right side watermark
308                }
309                None => return Ok(()),
310            }
311        }
312    }
313}
314
315pub(super) fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk {
316    let (data_chunk, ops) = chunk.into_parts();
317    let (columns, vis) = data_chunk.into_parts();
318    let output_columns = indices
319        .iter()
320        .cloned()
321        .map(|idx| columns[idx].clone())
322        .collect();
323    StreamChunk::with_visibility(ops, output_columns, vis)
324}
325
326pub(super) mod phase1 {
327    use std::ops::Bound;
328
329    use futures::{StreamExt, pin_mut};
330    use futures_async_stream::try_stream;
331    use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
332    use risingwave_common::array::{Op, StreamChunk};
333    use risingwave_common::hash::{HashKey, NullBitmap};
334    use risingwave_common::row::{self, OwnedRow, Row, RowExt};
335    use risingwave_common::types::{DataType, DatumRef};
336    use risingwave_common::util::iter_util::ZipEqDebug;
337    use risingwave_storage::StateStore;
338    use risingwave_storage::row_serde::value_serde::ValueRowSerde;
339
340    use super::{StreamExecutorError, TemporalSide};
341    use crate::common::table::state_table::StateTable;
342    use crate::executor::monitor::TemporalJoinMetrics;
343
344    pub trait Phase1Evaluation {
345        /// Called when a matched row is found.
346        #[must_use = "consume chunk if produced"]
347        fn append_matched_row(
348            op: Op,
349            builder: &mut StreamChunkBuilder,
350            left_row: impl Row,
351            right_row: impl Row,
352        ) -> Option<StreamChunk>;
353
354        /// Called when all matched rows of a join key are appended.
355        #[must_use = "consume chunk if produced"]
356        fn match_end(
357            builder: &mut StreamChunkBuilder,
358            op: Op,
359            left_row: impl Row,
360            right_size: usize,
361            matched: bool,
362        ) -> Option<StreamChunk>;
363    }
364
365    pub struct Inner;
366    pub struct LeftOuter;
367    pub struct LeftOuterWithCond;
368
369    impl Phase1Evaluation for Inner {
370        fn append_matched_row(
371            op: Op,
372            builder: &mut StreamChunkBuilder,
373            left_row: impl Row,
374            right_row: impl Row,
375        ) -> Option<StreamChunk> {
376            builder.append_row(op, left_row.chain(right_row))
377        }
378
379        fn match_end(
380            _builder: &mut StreamChunkBuilder,
381            _op: Op,
382            _left_row: impl Row,
383            _right_size: usize,
384            _matched: bool,
385        ) -> Option<StreamChunk> {
386            None
387        }
388    }
389
390    impl Phase1Evaluation for LeftOuter {
391        fn append_matched_row(
392            op: Op,
393            builder: &mut StreamChunkBuilder,
394            left_row: impl Row,
395            right_row: impl Row,
396        ) -> Option<StreamChunk> {
397            builder.append_row(op, left_row.chain(right_row))
398        }
399
400        fn match_end(
401            builder: &mut StreamChunkBuilder,
402            op: Op,
403            left_row: impl Row,
404            right_size: usize,
405            matched: bool,
406        ) -> Option<StreamChunk> {
407            if !matched {
408                // If no rows matched, a marker row should be inserted.
409                builder.append_row(
410                    op,
411                    left_row.chain(row::repeat_n(DatumRef::None, right_size)),
412                )
413            } else {
414                None
415            }
416        }
417    }
418
419    impl Phase1Evaluation for LeftOuterWithCond {
420        fn append_matched_row(
421            op: Op,
422            builder: &mut StreamChunkBuilder,
423            left_row: impl Row,
424            right_row: impl Row,
425        ) -> Option<StreamChunk> {
426            builder.append_row(op, left_row.chain(right_row))
427        }
428
429        fn match_end(
430            builder: &mut StreamChunkBuilder,
431            op: Op,
432            left_row: impl Row,
433            right_size: usize,
434            _matched: bool,
435        ) -> Option<StreamChunk> {
436            // A marker row should always be inserted and mark as invisible for non-lookup filters evaluation.
437            // The row will be converted to visible in the further steps if no rows matched after all filters evaluated.
438            builder.append_row_invisible(
439                op,
440                left_row.chain(row::repeat_n(DatumRef::None, right_size)),
441            )
442        }
443    }
444
445    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
446    #[expect(clippy::too_many_arguments)]
447    pub(super) async fn handle_chunk<
448        'a,
449        K: HashKey,
450        S: StateStore,
451        SD: ValueRowSerde,
452        E: Phase1Evaluation,
453        const APPEND_ONLY: bool,
454    >(
455        chunk_size: usize,
456        right_size: usize,
457        full_schema: Vec<DataType>,
458        left_join_keys: &'a [usize],
459        right_table: &'a mut TemporalSide<K, S, SD>,
460        memo_table_lookup_prefix: &'a [usize],
461        memo_table: &'a mut Option<StateTable<S>>,
462        null_matched: &'a K::Bitmap,
463        chunk: StreamChunk,
464        metrics: &'a TemporalJoinMetrics,
465    ) {
466        let mut builder = StreamChunkBuilder::new(chunk_size, full_schema);
467        let keys = K::build_many(left_join_keys, chunk.data_chunk());
468        let to_fetch_keys = chunk
469            .visibility()
470            .iter()
471            .zip_eq_debug(keys.iter())
472            .zip_eq_debug(chunk.ops())
473            .filter_map(|((vis, key), op)| {
474                if vis {
475                    if APPEND_ONLY {
476                        assert_eq!(*op, Op::Insert);
477                        Some(key)
478                    } else {
479                        match op {
480                            Op::Insert | Op::UpdateInsert => Some(key),
481                            Op::Delete | Op::UpdateDelete => None,
482                        }
483                    }
484                } else {
485                    None
486                }
487            });
488        right_table
489            .fetch_or_promote_keys(to_fetch_keys, metrics)
490            .await?;
491
492        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
493            let Some((op, left_row)) = r else {
494                continue;
495            };
496
497            let mut matched = false;
498
499            if APPEND_ONLY {
500                // Append-only temporal join
501                if key.null_bitmap().is_subset(null_matched)
502                    && let join_entry = right_table.force_peek(&key)
503                    && !join_entry.is_empty()
504                {
505                    matched = true;
506                    for right_row in join_entry.cached.values() {
507                        if let Some(chunk) =
508                            E::append_matched_row(op, &mut builder, left_row, right_row)
509                        {
510                            yield chunk;
511                        }
512                    }
513                }
514            } else {
515                // Non-append-only temporal join
516                // The memo-table pk and columns:
517                // (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`)
518                //
519                // Write pattern:
520                //   for each left input row (with insert op), construct the memo table pk and insert the row into the memo table.
521                // Read pattern:
522                //   for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table.
523                //
524                // Temporal join supports inner join and left outer join, additionally, it could contain other conditions.
525                // Surprisingly, we could handle them in a unified way with memo table.
526                // The memo table would persist rows fetched from the right table and appending the `join_key` and `left_pk` from the left row.
527                // The null rows generated by outer join and the other condition somehow is a stateless operation which means we can handle them without the memo table.
528                let memo_table = memo_table.as_mut().unwrap();
529                match op {
530                    Op::Insert | Op::UpdateInsert => {
531                        if key.null_bitmap().is_subset(null_matched)
532                            && let join_entry = right_table.force_peek(&key)
533                            && !join_entry.is_empty()
534                        {
535                            matched = true;
536                            for right_row in join_entry.cached.values() {
537                                let right_row: OwnedRow = right_row.clone();
538                                // Insert into memo table
539                                memo_table.insert(right_row.clone().chain(
540                                    left_row.project(memo_table_lookup_prefix).into_owned_row(),
541                                ));
542                                if let Some(chunk) = E::append_matched_row(
543                                    Op::Insert,
544                                    &mut builder,
545                                    left_row,
546                                    right_row,
547                                ) {
548                                    yield chunk;
549                                }
550                            }
551                        }
552                    }
553                    Op::Delete | Op::UpdateDelete => {
554                        let mut memo_rows_to_delete = vec![];
555                        if key.null_bitmap().is_subset(null_matched) {
556                            let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
557                                &(Bound::Unbounded, Bound::Unbounded);
558                            let prefix = left_row.project(memo_table_lookup_prefix);
559                            let state_table_iter = memo_table
560                                .iter_with_prefix(prefix, sub_range, Default::default())
561                                .await?;
562                            pin_mut!(state_table_iter);
563
564                            while let Some(memo_row) = state_table_iter.next().await {
565                                matched = true;
566                                let memo_row = memo_row?.into_owned_row();
567                                memo_rows_to_delete.push(memo_row.clone());
568                                if let Some(chunk) = E::append_matched_row(
569                                    Op::Delete,
570                                    &mut builder,
571                                    left_row,
572                                    memo_row.slice(0..right_size),
573                                ) {
574                                    yield chunk;
575                                }
576                            }
577                        }
578                        for memo_row in memo_rows_to_delete {
579                            // Delete from memo table
580                            memo_table.delete(memo_row);
581                        }
582                    }
583                }
584            }
585            if let Some(chunk) = E::match_end(
586                &mut builder,
587                match op {
588                    Op::Insert | Op::UpdateInsert => Op::Insert,
589                    Op::Delete | Op::UpdateDelete => Op::Delete,
590                },
591                left_row,
592                right_size,
593                matched,
594            ) {
595                yield chunk;
596            }
597        }
598
599        if let Some(chunk) = builder.take() {
600            yield chunk;
601        }
602    }
603}
604
605impl<
606    K: HashKey,
607    S: StateStore,
608    SD: ValueRowSerde,
609    const T: JoinTypePrimitive,
610    const APPEND_ONLY: bool,
611> TemporalJoinExecutor<K, S, SD, T, APPEND_ONLY>
612{
613    #[expect(clippy::too_many_arguments)]
614    pub fn new(
615        ctx: ActorContextRef,
616        info: ExecutorInfo,
617        left: Executor,
618        right: Executor,
619        table: ReplicatedStateTable<S, SD>,
620        left_join_keys: Vec<usize>,
621        right_join_keys: Vec<usize>,
622        null_safe: Vec<bool>,
623        condition: Option<NonStrictExpression>,
624        output_indices: Vec<usize>,
625        table_stream_key_indices: Vec<usize>,
626        watermark_sequence: AtomicU64Ref,
627        metrics: Arc<StreamingMetrics>,
628        chunk_size: usize,
629        join_key_data_types: Vec<DataType>,
630        memo_table: Option<StateTable<S>>,
631    ) -> Self {
632        let metrics_info =
633            MetricsInfo::new(metrics.clone(), table.table_id(), ctx.id, "temporal join");
634
635        let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info);
636
637        let metrics = metrics.new_temporal_join_metrics(table.table_id(), ctx.id, ctx.fragment_id);
638
639        Self {
640            ctx,
641            info,
642            left,
643            right,
644            right_table: TemporalSide {
645                source: table,
646                table_stream_key_indices,
647                cache,
648                join_key_data_types,
649            },
650            left_join_keys,
651            right_join_keys,
652            null_safe,
653            condition,
654            output_indices,
655            chunk_size,
656            memo_table,
657            metrics,
658        }
659    }
660
661    #[try_stream(ok = Message, error = StreamExecutorError)]
662    async fn into_stream(mut self) {
663        let right_size = self.right.schema().len();
664
665        let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
666            &self.output_indices,
667            self.left.schema().len(),
668            right_size,
669        );
670
671        let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
672
673        let left_stream_key_indices = self.left.stream_key().to_vec();
674        let right_stream_key_indices = self.right.stream_key().to_vec();
675        let memo_table_lookup_prefix = self
676            .left_join_keys
677            .iter()
678            .cloned()
679            .chain(left_stream_key_indices)
680            .collect_vec();
681
682        let null_matched = K::Bitmap::from_bool_vec(self.null_safe);
683
684        let full_schema: Vec<_> = self
685            .left
686            .schema()
687            .data_types()
688            .into_iter()
689            .chain(self.right.schema().data_types().into_iter())
690            .collect();
691
692        let input = align_input::<true>(self.left, self.right);
693        pin_mut!(input);
694        let barrier = expect_first_barrier(&mut input).await?;
695        let barrier_epoch = barrier.epoch;
696        yield Message::Barrier(barrier);
697        self.right_table.source.init_epoch(barrier_epoch).await?;
698        if !APPEND_ONLY {
699            self.memo_table
700                .as_mut()
701                .unwrap()
702                .init_epoch(barrier_epoch)
703                .await?;
704        }
705
706        #[for_await]
707        for msg in input {
708            self.right_table.cache.evict();
709            self.metrics
710                .temporal_join_cached_entry_count
711                .set(self.right_table.cache.len() as i64);
712            match msg? {
713                InternalMessage::WaterMark(watermark) => {
714                    let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap();
715                    yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
716                }
717                InternalMessage::Chunk(chunk) => {
718                    let full_schema = full_schema.clone();
719
720                    if T == JoinType::Inner {
721                        let st1 = phase1::handle_chunk::<K, S, SD, phase1::Inner, APPEND_ONLY>(
722                            self.chunk_size,
723                            right_size,
724                            full_schema,
725                            &self.left_join_keys,
726                            &mut self.right_table,
727                            &memo_table_lookup_prefix,
728                            &mut self.memo_table,
729                            &null_matched,
730                            chunk,
731                            &self.metrics,
732                        );
733                        #[for_await]
734                        for chunk in st1 {
735                            let chunk = chunk?;
736                            let new_chunk = if let Some(ref cond) = self.condition {
737                                let (data_chunk, ops) = chunk.into_parts();
738                                let passed_bitmap = cond.eval_infallible(&data_chunk).await;
739                                let passed_bitmap =
740                                    Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
741                                let (columns, vis) = data_chunk.into_parts();
742                                let new_vis = vis & passed_bitmap;
743                                StreamChunk::with_visibility(ops, columns, new_vis)
744                            } else {
745                                chunk
746                            };
747                            let new_chunk = apply_indices_map(new_chunk, &self.output_indices);
748                            yield Message::Chunk(new_chunk);
749                        }
750                    } else if let Some(ref cond) = self.condition {
751                        // Joined result without evaluating non-lookup conditions.
752                        let st1 = phase1::handle_chunk::<
753                            K,
754                            S,
755                            SD,
756                            phase1::LeftOuterWithCond,
757                            APPEND_ONLY,
758                        >(
759                            self.chunk_size,
760                            right_size,
761                            full_schema,
762                            &self.left_join_keys,
763                            &mut self.right_table,
764                            &memo_table_lookup_prefix,
765                            &mut self.memo_table,
766                            &null_matched,
767                            chunk,
768                            &self.metrics,
769                        );
770                        let mut matched_count = 0usize;
771                        #[for_await]
772                        for chunk in st1 {
773                            let chunk = chunk?;
774                            let (data_chunk, ops) = chunk.into_parts();
775                            let passed_bitmap = cond.eval_infallible(&data_chunk).await;
776                            let passed_bitmap =
777                                Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap();
778                            let (columns, vis) = data_chunk.into_parts();
779                            let mut new_vis = BitmapBuilder::with_capacity(vis.len());
780                            for (passed, not_match_end) in
781                                passed_bitmap.iter().zip_eq_debug(vis.iter())
782                            {
783                                let is_match_end = !not_match_end;
784                                let vis = if is_match_end && matched_count == 0 {
785                                    // Nothing is matched, so the marker row should be visible.
786                                    true
787                                } else if is_match_end {
788                                    // reset the count
789                                    matched_count = 0;
790                                    // rows found, so the marker row should be invisible.
791                                    false
792                                } else {
793                                    if passed {
794                                        matched_count += 1;
795                                    }
796                                    passed
797                                };
798                                new_vis.append(vis);
799                            }
800                            let new_chunk = apply_indices_map(
801                                StreamChunk::with_visibility(ops, columns, new_vis.finish()),
802                                &self.output_indices,
803                            );
804                            yield Message::Chunk(new_chunk);
805                        }
806                        // The last row should always be marker row,
807                        assert_eq!(matched_count, 0);
808                    } else {
809                        let st1 = phase1::handle_chunk::<K, S, SD, phase1::LeftOuter, APPEND_ONLY>(
810                            self.chunk_size,
811                            right_size,
812                            full_schema,
813                            &self.left_join_keys,
814                            &mut self.right_table,
815                            &memo_table_lookup_prefix,
816                            &mut self.memo_table,
817                            &null_matched,
818                            chunk,
819                            &self.metrics,
820                        );
821                        #[for_await]
822                        for chunk in st1 {
823                            let chunk = chunk?;
824                            let new_chunk = apply_indices_map(chunk, &self.output_indices);
825                            yield Message::Chunk(new_chunk);
826                        }
827                    }
828                }
829                InternalMessage::Barrier(updates, barrier) => {
830                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
831
832                    // Write right-side chunks to the replicated state table and update LRU cache.
833                    // Must happen before commit.
834                    self.right_table.update(
835                        updates,
836                        &self.right_join_keys,
837                        &right_stream_key_indices,
838                    )?;
839                    let right_post_commit = self.right_table.source.commit(barrier.epoch).await?;
840                    let memo_post_commit = if !APPEND_ONLY {
841                        Some(
842                            self.memo_table
843                                .as_mut()
844                                .unwrap()
845                                .commit(barrier.epoch)
846                                .await?,
847                        )
848                    } else {
849                        None
850                    };
851
852                    yield Message::Barrier(barrier);
853
854                    if let Some((_, true)) = right_post_commit
855                        .post_yield_barrier(update_vnode_bitmap.clone())
856                        .await?
857                    {
858                        self.right_table.cache.clear();
859                    }
860                    if let Some(memo_post_commit) = memo_post_commit {
861                        memo_post_commit
862                            .post_yield_barrier(update_vnode_bitmap.clone())
863                            .await?;
864                    }
865                }
866            }
867        }
868    }
869}
870
871impl<
872    K: HashKey,
873    S: StateStore,
874    SD: ValueRowSerde,
875    const T: JoinTypePrimitive,
876    const APPEND_ONLY: bool,
877> Execute for TemporalJoinExecutor<K, S, SD, T, APPEND_ONLY>
878{
879    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
880        self.into_stream().boxed()
881    }
882}
883
884#[cfg(test)]
885mod tests {
886    use std::collections::HashSet;
887    use std::sync::Arc;
888    use std::sync::atomic::AtomicU64;
889
890    use risingwave_common::array::*;
891    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
892    use risingwave_common::hash::Key32;
893    use risingwave_common::types::{DataType, ScalarRefImpl};
894    use risingwave_common::util::epoch::{EpochPair, test_epoch};
895    use risingwave_common::util::sort_util::OrderType;
896    use risingwave_common::util::value_encoding::BasicSerde;
897    use risingwave_hummock_test::test_utils::prepare_hummock_test_env;
898    use risingwave_storage::hummock::HummockStorage;
899
900    use super::*;
901    use crate::common::table::state_table::{
902        StateTable, StateTableBuilder, StateTableOpConsistencyLevel,
903    };
904    use crate::common::table::test_utils::gen_pbtable;
905    use crate::executor::monitor::StreamingMetrics;
906    use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
907    use crate::executor::{ActorContext, ExecutorInfo, JoinType};
908
909    /// Tests that a temporal join on a pk-prefix (join key is a strict prefix of the right table's
910    /// pk) correctly merges rows committed in epoch1 with rows staged/committed during epoch2, and
911    /// produces the right results when the left side arrives in epoch3.
912    ///
913    /// Right table: (key INT, seq INT, val INT), pk = (key, seq), SINGLETON distribution.
914    /// Join condition: `left.left_key` = right.key  (pk prefix: join uses only the first pk column).
915    ///
916    /// Pre-commit at epoch1: (1,1,100), (1,2,200), (2,1,300)
917    /// Epoch2:  right side sends insert (3,1,400) — written to replicated state table, committed
918    ///          at the epoch2 barrier.
919    /// Epoch3:  left side sends (`left_key=1`, `left_val=111`) and (`left_key=3`, `left_val=333`).
920    ///
921    /// Expected join output in epoch3:
922    ///   (1, 111, 1, 1, 100)  — key=1 row matched from epoch1 data (cache miss → state store read)
923    ///   (1, 111, 1, 2, 200)  — key=1 row matched from epoch1 data (same cache entry)
924    ///   (3, 333, 3, 1, 400)  — key=3 row matched from epoch2 data (cache miss → state store read)
925    #[tokio::test]
926    async fn test_temporal_join_pk_prefix_staging_merge() {
927        let test_env = prepare_hummock_test_env().await;
928        let table_id = TableId::new(1);
929
930        // Right table schema: (key INT col_id=1, seq INT col_id=2, val INT col_id=3)
931        // pk = (key idx=0, seq idx=1), SINGLETON distribution (empty distribution_key),
932        // read_prefix_len_hint = 2 (= full pk length).
933        let right_col_descs = vec![
934            ColumnDesc::unnamed(ColumnId::new(1), DataType::Int32),
935            ColumnDesc::unnamed(ColumnId::new(2), DataType::Int32),
936            ColumnDesc::unnamed(ColumnId::new(3), DataType::Int32),
937        ];
938        let order_types = vec![OrderType::ascending(), OrderType::ascending()];
939        let pk_indices = vec![0usize, 1];
940        let pbtable = gen_pbtable(table_id, right_col_descs, order_types, pk_indices, 2);
941
942        test_env.register_table(pbtable.clone()).await;
943
944        // Pre-commit epoch1 data via a plain StateTable (bypasses the executor).
945        {
946            let mut setup_table = StateTable::<HummockStorage>::from_table_catalog_inconsistent_op(
947                &pbtable,
948                test_env.storage.clone(),
949                None,
950            )
951            .await;
952            test_env
953                .storage
954                .start_epoch(test_epoch(1), HashSet::from_iter([table_id]));
955            setup_table
956                .init_epoch(EpochPair::new_test_epoch(test_epoch(1)))
957                .await
958                .unwrap();
959            setup_table.insert(OwnedRow::new(vec![
960                Some(1i32.into()),
961                Some(1i32.into()),
962                Some(100i32.into()),
963            ]));
964            setup_table.insert(OwnedRow::new(vec![
965                Some(1i32.into()),
966                Some(2i32.into()),
967                Some(200i32.into()),
968            ]));
969            setup_table.insert(OwnedRow::new(vec![
970                Some(2i32.into()),
971                Some(1i32.into()),
972                Some(300i32.into()),
973            ]));
974            test_env
975                .storage
976                .start_epoch(test_epoch(2), HashSet::from_iter([table_id]));
977            setup_table
978                .commit_for_test(EpochPair::new_test_epoch(test_epoch(2)))
979                .await
980                .unwrap();
981            // Seal and commit epoch1 data in Hummock so it is visible to all readers.
982            test_env.commit_epoch(test_epoch(1)).await;
983        }
984
985        // Build the replicated state table for the executor (all 3 columns are output).
986        let output_column_ids = vec![ColumnId::new(1), ColumnId::new(2), ColumnId::new(3)];
987        let right_table = StateTableBuilder::<_, BasicSerde, true, _>::new(
988            &pbtable,
989            test_env.storage.clone(),
990            None,
991        )
992        .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
993        .with_output_column_ids(output_column_ids)
994        .forbid_preload_all_rows()
995        .build()
996        .await;
997
998        // Left source: (left_key INT, left_val INT), stream_key = [0].
999        let left_schema = Schema::new(vec![
1000            Field::unnamed(DataType::Int32),
1001            Field::unnamed(DataType::Int32),
1002        ]);
1003        let (mut left_tx, left_source) = MockSource::channel();
1004        let left_executor = left_source.into_executor(left_schema.clone(), vec![0]);
1005
1006        // Right source: mirrors the right table columns (key INT, seq INT, val INT),
1007        // stream_key = [0, 1] (the pk).
1008        let right_schema = Schema::new(vec![
1009            Field::unnamed(DataType::Int32),
1010            Field::unnamed(DataType::Int32),
1011            Field::unnamed(DataType::Int32),
1012        ]);
1013        let (mut right_tx, right_source) = MockSource::channel();
1014        let right_executor = right_source.into_executor(right_schema.clone(), vec![0, 1]);
1015
1016        // table_stream_key_indices: pk of right table within the output row = [0, 1] (key, seq).
1017        let table_stream_key_indices = vec![0usize, 1];
1018
1019        // Join on left.left_key (col 0) = right.key (col 0 in right source).
1020        let left_join_keys = vec![0usize];
1021        let right_join_keys = vec![0usize];
1022        let null_safe = vec![false];
1023        let join_key_data_types = vec![DataType::Int32];
1024
1025        // Output: all 5 columns — [left_key, left_val, key, seq, val].
1026        let output_indices = vec![0usize, 1, 2, 3, 4];
1027        let output_schema = Schema::new(vec![
1028            Field::unnamed(DataType::Int32),
1029            Field::unnamed(DataType::Int32),
1030            Field::unnamed(DataType::Int32),
1031            Field::unnamed(DataType::Int32),
1032            Field::unnamed(DataType::Int32),
1033        ]);
1034        let info = ExecutorInfo::for_test(output_schema, vec![], "TemporalJoinTest".to_owned(), 0);
1035
1036        let executor = TemporalJoinExecutor::<
1037            Key32,
1038            HummockStorage,
1039            BasicSerde,
1040            { JoinType::Inner },
1041            true,
1042        >::new(
1043            ActorContext::for_test(0),
1044            info.clone(),
1045            left_executor,
1046            right_executor,
1047            right_table,
1048            left_join_keys,
1049            right_join_keys,
1050            null_safe,
1051            None, // no extra non-equi condition
1052            output_indices,
1053            table_stream_key_indices,
1054            Arc::new(AtomicU64::new(0)),
1055            Arc::new(StreamingMetrics::unused()),
1056            1024,
1057            join_key_data_types,
1058            None, // no memo table (append-only inner join)
1059        );
1060
1061        let mut stream = Box::new(executor).execute();
1062
1063        // Push the first barrier (init epoch2, prev = epoch1) on both sides.
1064        left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(2), test_epoch(1), false);
1065        right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(2), test_epoch(1), false);
1066        stream.expect_barrier().await;
1067
1068        // Epoch2: right side inserts (3, 1, 400); left side is quiet.
1069        // The epoch2→epoch3 barrier will trigger write_chunk + commit for the right table,
1070        // making (3,1,400) visible as committed epoch2 data.
1071        right_tx.push_chunk(StreamChunk::from_pretty(
1072            " i i   i
1073            + 3 1 400",
1074        ));
1075        // Start epoch3 before the barrier that commits epoch2 data.
1076        test_env
1077            .storage
1078            .start_epoch(test_epoch(3), HashSet::from_iter([table_id]));
1079        left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(3), test_epoch(2), false);
1080        right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(3), test_epoch(2), false);
1081        stream.expect_barrier().await;
1082
1083        // Start epoch4 before the stop barrier that commits epoch3 data.
1084        test_env
1085            .storage
1086            .start_epoch(test_epoch(4), HashSet::from_iter([table_id]));
1087
1088        // Epoch3: left side sends two rows.
1089        //   key=1 → cache miss → state store read → finds (1,1,100) and (1,2,200) from epoch1.
1090        //   key=3 → cache miss → state store read → finds (3,1,400) from epoch2.
1091        left_tx.push_chunk(StreamChunk::from_pretty(
1092            " i   i
1093            + 1 111
1094            + 3 333",
1095        ));
1096        left_tx.push_barrier_with_prev_epoch_for_test(test_epoch(4), test_epoch(3), true);
1097        right_tx.push_barrier_with_prev_epoch_for_test(test_epoch(4), test_epoch(3), true);
1098
1099        // Collect all output rows before the stop barrier.
1100        let mut output_rows: Vec<[i32; 5]> = vec![];
1101        loop {
1102            match stream.next().await.unwrap().unwrap() {
1103                Message::Chunk(chunk) => {
1104                    for (op, row) in chunk.rows() {
1105                        assert_eq!(op, Op::Insert);
1106                        let row: [i32; 5] =
1107                            std::array::from_fn(|i| match row.datum_at(i).unwrap() {
1108                                ScalarRefImpl::Int32(v) => v,
1109                                _ => panic!("expected Int32"),
1110                            });
1111                        output_rows.push(row);
1112                    }
1113                }
1114                Message::Barrier(_) => break,
1115                _ => {}
1116            }
1117        }
1118
1119        output_rows.sort();
1120        assert_eq!(
1121            output_rows,
1122            vec![
1123                [1, 111, 1, 1, 100], // key=1 matched epoch1 row (1,1,100)
1124                [1, 111, 1, 2, 200], // key=1 matched epoch1 row (1,2,200)
1125                [3, 333, 3, 1, 400], // key=3 matched epoch2 row (3,1,400)
1126            ]
1127        );
1128    }
1129}