risingwave_stream/executor/join/
asof_join.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Hash map for `AsOf` join with optional LRU cache support.
16//!
17//! When cache is disabled, the hash map directly queries the state table.
18//! When cache is enabled, it maintains a `ManagedLruCache` that caches all rows for
19//! each join key, indexed by (`inequality_key`, `pk_suffix`) for efficient range queries.
20
21use std::collections::BTreeMap;
22use std::ops::Bound;
23use std::sync::Arc;
24
25use anyhow::Context;
26use futures::StreamExt;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::TableId;
29use risingwave_common::row::{CompactedRow, OwnedRow, Row, RowExt};
30use risingwave_common::types::{DataType, ScalarImpl};
31use risingwave_common::util::epoch::EpochPair;
32use risingwave_common::util::row_serde::OrderedRowSerde;
33use risingwave_common::util::sort_util::OrderType;
34use risingwave_common_estimate_size::{EstimateSize, KvSize};
35use risingwave_storage::StateStore;
36use risingwave_storage::store::PrefetchOptions;
37
38use super::hash_join::{JoinEntryError, JoinHashMapMetrics, TableInner};
39use super::join_row_set::JoinRowSet;
40use crate::cache::ManagedLruCache;
41use crate::common::metrics::MetricsInfo;
42use crate::common::table::state_table::{RowStream, StateTable, StateTablePostCommit};
43use crate::consistency::{consistency_error, enable_strict_consistency};
44use crate::executor::error::StreamExecutorResult;
45use crate::executor::monitor::StreamingMetrics;
46use crate::task::{ActorId, AtomicU64Ref, FragmentId};
47
48/// Memcomparable encoding for inequality key and pk suffix.
49type InequalKeyBytes = Vec<u8>;
50type PkSuffixBytes = Vec<u8>;
51
52/// Trait for encoding/decoding cached `AsOf` join rows.
53/// Unlike `JoinEncoding`, this doesn't carry degree since `AsOf` join has no degree tracking.
54pub trait AsOfRowEncoding: 'static + Send + Sync {
55    type Encoded: EstimateSize + Clone + Send + Sync;
56    fn encode(row: &OwnedRow) -> Self::Encoded;
57    /// Decode an encoded value back to `OwnedRow`.
58    fn decode(encoded: &Self::Encoded, data_types: &[DataType]) -> OwnedRow;
59}
60
61/// CPU-optimized encoding: stores full `OwnedRow` in cache (no encode/decode cost).
62pub struct AsOfCpuEncoding;
63
64impl AsOfRowEncoding for AsOfCpuEncoding {
65    type Encoded = OwnedRow;
66
67    fn encode(row: &OwnedRow) -> OwnedRow {
68        // TODO: Avoid this clone
69        row.clone()
70    }
71
72    fn decode(encoded: &OwnedRow, _data_types: &[DataType]) -> OwnedRow {
73        // TODO: Avoid this clone
74        encoded.clone()
75    }
76}
77
78/// Memory-optimized encoding: stores `CompactedRow` (serialized bytes) in cache.
79pub struct AsOfMemoryEncoding;
80
81impl AsOfRowEncoding for AsOfMemoryEncoding {
82    type Encoded = CompactedRow;
83
84    fn encode(row: &OwnedRow) -> CompactedRow {
85        row.into()
86    }
87
88    fn decode(encoded: &CompactedRow, data_types: &[DataType]) -> OwnedRow {
89        encoded
90            .deserialize(data_types)
91            .expect("failed to decode compacted row in AsOf join cache")
92    }
93}
94
95/// A cache entry for a single join key, storing all encoded rows indexed by (`inequality_key`, `pk_suffix`).
96/// The outer `BTreeMap` gives efficient range queries on inequality keys;
97/// the inner `JoinRowSet` adaptively uses `Vec` for small pk sets and `BTreeMap` for larger ones.
98///
99/// `V` is the encoded row type: `OwnedRow` for CPU encoding, `CompactedRow` for memory encoding.
100pub struct AsOfJoinCacheEntry<V: EstimateSize + Clone> {
101    /// `inequality_key_bytes` -> { `pk_suffix_bytes` -> `encoded_row` }
102    inner: BTreeMap<InequalKeyBytes, JoinRowSet<PkSuffixBytes, V>>,
103    /// Incrementally tracked heap size of all keys and values.
104    kv_heap_size: KvSize,
105}
106
107impl<V: EstimateSize + Clone> Default for AsOfJoinCacheEntry<V> {
108    fn default() -> Self {
109        Self {
110            inner: BTreeMap::new(),
111            kv_heap_size: KvSize::new(),
112        }
113    }
114}
115
116impl<V: EstimateSize + Clone> EstimateSize for AsOfJoinCacheEntry<V> {
117    fn estimated_heap_size(&self) -> usize {
118        // TODO: Add btreemap internal size.
119        // https://github.com/risingwavelabs/risingwave/issues/9713
120        self.kv_heap_size.size()
121    }
122}
123
124impl<V: EstimateSize + Clone> AsOfJoinCacheEntry<V> {
125    /// Find the encoded row with the greatest inequality key <= bound (upper bound query).
126    /// Returns the first entry by pk order in the pk sub-map for that inequality key.
127    fn upper_bound(&self, bound: Bound<&[u8]>) -> Option<&V> {
128        self.inner
129            .range::<[u8], _>((Bound::Unbounded, bound))
130            .next_back()
131            .and_then(|(_, pk_map)| pk_map.first_key_sorted().and_then(|k| pk_map.get(k)))
132    }
133
134    /// Find the encoded row with the smallest inequality key >= bound (lower bound query).
135    /// Returns the first entry by pk order in the pk sub-map for that inequality key.
136    fn lower_bound(&self, bound: Bound<&[u8]>) -> Option<&V> {
137        self.inner
138            .range::<[u8], _>((bound, Bound::Unbounded))
139            .next()
140            .and_then(|(_, pk_map)| pk_map.first_key_sorted().and_then(|k| pk_map.get(k)))
141    }
142
143    /// Iterate all encoded rows within an inequality key range.
144    fn range(
145        &self,
146        range: (Bound<&[u8]>, Bound<&[u8]>),
147    ) -> impl Iterator<Item = &V> + '_ + use<'_, V> {
148        self.inner
149            .range::<[u8], _>(range)
150            .flat_map(|(_, pk_map)| pk_map.values())
151    }
152
153    /// Find the first encoded row (by pk order) with exact inequality key match.
154    fn first_by_inequality(&self, ineq_key_bytes: &[u8]) -> Option<&V> {
155        self.inner
156            .get(ineq_key_bytes)
157            .and_then(|pk_map| pk_map.first_key_sorted().and_then(|k| pk_map.get(k)))
158    }
159
160    /// Find the first two encoded rows (by PK order) with exact inequality key match.
161    fn first_two_by_inequality(&self, ineq_key_bytes: &[u8]) -> (Option<&V>, Option<&V>) {
162        if let Some(pk_map) = self.inner.get(ineq_key_bytes) {
163            let (first_key, second_key) = pk_map.first_two_key_sorted();
164            let first = first_key.and_then(|k| pk_map.get(k));
165            let second = second_key.and_then(|k| pk_map.get(k));
166            (first, second)
167        } else {
168            (None, None)
169        }
170    }
171
172    /// Insert an encoded row into the cache entry.
173    fn insert(
174        &mut self,
175        ineq_key_bytes: InequalKeyBytes,
176        pk_suffix_bytes: PkSuffixBytes,
177        val: V,
178    ) -> Result<(), JoinEntryError> {
179        use std::collections::btree_map::Entry;
180        let pk_map = match self.inner.entry(ineq_key_bytes) {
181            Entry::Vacant(e) => {
182                self.kv_heap_size.add_val(e.key());
183                e.insert(JoinRowSet::default())
184            }
185            Entry::Occupied(e) => e.into_mut(),
186        };
187        if !enable_strict_consistency()
188            && let Some(old_val) = pk_map.remove(&pk_suffix_bytes)
189        {
190            self.kv_heap_size.sub(&pk_suffix_bytes, &old_val);
191        }
192        self.kv_heap_size.add(&pk_suffix_bytes, &val);
193        let ret = pk_map.try_insert(pk_suffix_bytes.clone(), val);
194        if !enable_strict_consistency() {
195            assert!(ret.is_ok(), "we have removed existing entry, if any");
196        }
197        ret.map(|_| ()).map_err(|_| JoinEntryError::Occupied)
198    }
199
200    /// Delete an encoded row from the cache entry.
201    fn delete(
202        &mut self,
203        ineq_key_bytes: &InequalKeyBytes,
204        pk_suffix_bytes: &PkSuffixBytes,
205    ) -> Result<(), JoinEntryError> {
206        if let Some(pk_map) = self.inner.get_mut(ineq_key_bytes) {
207            if let Some(old_val) = pk_map.remove(pk_suffix_bytes) {
208                self.kv_heap_size.sub(pk_suffix_bytes, &old_val);
209                if pk_map.is_empty() {
210                    self.kv_heap_size.sub_val(ineq_key_bytes);
211                    self.inner.remove(ineq_key_bytes);
212                }
213                Ok(())
214            } else if enable_strict_consistency() {
215                Err(JoinEntryError::Remove)
216            } else {
217                consistency_error!(
218                    ?pk_suffix_bytes,
219                    "removing an asof join cache entry but it's not in the cache"
220                );
221                Ok(())
222            }
223        } else if enable_strict_consistency() {
224            Err(JoinEntryError::Remove)
225        } else {
226            consistency_error!(
227                ?pk_suffix_bytes,
228                "removing an asof join cache entry but it's not in the cache"
229            );
230            Ok(())
231        }
232    }
233}
234
235/// A hash map for `AsOf` join with optional LRU cache support.
236///
237/// When `cache` is `None`, it directly queries the state table (no-cache mode).
238/// When `cache` is `Some`, it maintains a `ManagedLruCache` keyed by join key,
239/// caching all rows for each join key indexed by (`inequality_key`, `pk_suffix`).
240///
241/// `E` controls row encoding in the cache: `AsOfCpuEncoding` (full `OwnedRow`) or
242/// `AsOfMemoryEncoding` (compact `CompactedRow`).
243pub struct AsOfJoinHashMap<S: StateStore, E: AsOfRowEncoding> {
244    /// State table. Contains the data from upstream.
245    state: TableInner<S>,
246    /// The index of the inequality key column.
247    inequality_key_idx: usize,
248    /// Serializer for inequality key (single column value -> ordered bytes).
249    inequality_key_serializer: OrderedRowSerde,
250    /// Serializer for pk suffix (deduped pk columns -> ordered bytes).
251    pk_suffix_serializer: OrderedRowSerde,
252    /// The indices of the deduped pk columns in the input row (for projecting pk suffix).
253    pk_suffix_indices: Vec<usize>,
254    /// LRU cache: `join_key` (`OwnedRow`) -> encoded cache entry. None if cache is disabled.
255    cache: Option<ManagedLruCache<OwnedRow, AsOfJoinCacheEntry<E::Encoded>>>,
256    /// The indices of join key columns in the input row (for projecting join key).
257    join_key_indices: Vec<usize>,
258    /// All data types of the state table rows, needed for decoding `CompactedRow`.
259    state_all_data_types: Vec<DataType>,
260    /// Metrics of the hash map
261    metrics: JoinHashMapMetrics,
262}
263
264impl<S: StateStore, E: AsOfRowEncoding> AsOfJoinHashMap<S, E> {
265    /// Create a [`AsOfJoinHashMap`] for `AsOf` join with optional cache.
266    ///
267    /// If `watermark_epoch` is `Some`, an LRU cache is enabled.
268    /// If `watermark_epoch` is `None`, the hash map directly queries the state table.
269    #[allow(clippy::too_many_arguments)]
270    pub fn new(
271        state_join_key_indices: Vec<usize>,
272        state_table: StateTable<S>,
273        state_pk_indices: Vec<usize>,
274        inequality_key_idx: usize,
275        state_all_data_types: Vec<DataType>,
276        watermark_epoch: Option<AtomicU64Ref>,
277        metrics: Arc<StreamingMetrics>,
278        actor_id: ActorId,
279        fragment_id: FragmentId,
280        side: &'static str,
281    ) -> Self {
282        let join_table_id = state_table.table_id();
283
284        // Build serializer for inequality key (single column).
285        let inequality_key_serializer = OrderedRowSerde::new(
286            vec![state_all_data_types[inequality_key_idx].clone()],
287            vec![OrderType::ascending()],
288        );
289
290        // Build serializer for pk suffix (deduped pk columns).
291        let pk_suffix_serializer = OrderedRowSerde::new(
292            state_pk_indices
293                .iter()
294                .map(|idx| state_all_data_types[*idx].clone())
295                .collect(),
296            vec![OrderType::ascending(); state_pk_indices.len()],
297        );
298
299        let pk_suffix_indices = state_pk_indices.clone();
300        let join_key_indices = state_join_key_indices.clone();
301
302        let cache = watermark_epoch.map(|epoch_ref| {
303            let metrics_info = MetricsInfo::new(
304                metrics.clone(),
305                join_table_id,
306                actor_id,
307                format!("asof join {}", side),
308            );
309            ManagedLruCache::unbounded(epoch_ref, metrics_info)
310        });
311
312        let state = TableInner::new(state_pk_indices, state_join_key_indices, state_table, None);
313
314        Self {
315            state,
316            inequality_key_idx,
317            inequality_key_serializer,
318            pk_suffix_serializer,
319            pk_suffix_indices,
320            cache,
321            join_key_indices,
322            state_all_data_types,
323            metrics: JoinHashMapMetrics::new(&metrics, actor_id, fragment_id, side, join_table_id),
324        }
325    }
326
327    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
328        self.state.table.init_epoch(epoch).await?;
329        Ok(())
330    }
331
332    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
333        self.state.table.update_watermark(watermark);
334    }
335
336    pub fn table_id(&self) -> TableId {
337        self.state.table.table_id()
338    }
339
340    /// Cached entry count (number of distinct join keys in the LRU cache).
341    /// Returns 0 if cache is disabled.
342    pub fn entry_count(&self) -> usize {
343        self.cache.as_ref().map_or(0, |c| c.len())
344    }
345
346    /// Serialize a row as an inequality key for cache lookup.
347    /// The row should already be the inequality column(s) (e.g. a projected single-column row).
348    fn serialize_inequal_key(&self, row: &impl Row) -> Vec<u8> {
349        row.memcmp_serialize(&self.inequality_key_serializer)
350    }
351
352    /// Serialize the inequality key from a full state table row for cache key.
353    fn serialize_inequal_key_from_state_row(&self, row: &impl Row) -> Vec<u8> {
354        row.project(&[self.inequality_key_idx])
355            .memcmp_serialize(&self.inequality_key_serializer)
356    }
357
358    /// Serialize the pk suffix from a row for cache key.
359    fn serialize_pk_suffix(&self, row: &impl Row) -> Vec<u8> {
360        row.project(&self.pk_suffix_indices)
361            .memcmp_serialize(&self.pk_suffix_serializer)
362    }
363
364    /// Ensure the cache entry for the given join key is populated.
365    /// Does nothing if cache is disabled or the entry is already present.
366    async fn ensure_cache_populated(&mut self, join_key: &OwnedRow) -> StreamExecutorResult<()> {
367        let needs_populate = match &self.cache {
368            Some(cache) => !cache.contains(join_key),
369            None => return Ok(()),
370        };
371        self.metrics.inc_lookup();
372        if !needs_populate {
373            return Ok(());
374        }
375        self.metrics.inc_lookup_miss();
376
377        // Cache miss - load all rows for this join key from state table.
378        let sub_range: (Bound<OwnedRow>, Bound<OwnedRow>) = (Bound::Unbounded, Bound::Unbounded);
379        let table_iter = self
380            .state
381            .table
382            .iter_with_prefix(join_key, &sub_range, PrefetchOptions::default())
383            .await?;
384        let mut pinned = std::pin::pin!(table_iter);
385        let mut entry = AsOfJoinCacheEntry::<E::Encoded>::default();
386        while let Some(row) = pinned.next().await.transpose()? {
387            let ineq_key = self.serialize_inequal_key_from_state_row(&row);
388            let pk_suffix = self.serialize_pk_suffix(&row);
389            entry
390                .insert(ineq_key, pk_suffix, E::encode(&row))
391                .with_context(|| format!("row: {}", row.display()))?;
392        }
393        self.cache.as_mut().unwrap().put(join_key.clone(), entry);
394        Ok(())
395    }
396
397    /// Insert a row into the state table and optionally update the cache.
398    pub fn insert(&mut self, value: impl Row) -> StreamExecutorResult<()> {
399        if self.cache.is_some() {
400            // Convert to OwnedRow first to avoid move issues with `impl Row`.
401            let owned_row = value.to_owned_row();
402            let ineq_key = self.serialize_inequal_key_from_state_row(&owned_row);
403            let pk_suffix = self.serialize_pk_suffix(&owned_row);
404            // Use reference to avoid moving owned_row.
405            let join_key = (&owned_row).project(&self.join_key_indices).to_owned_row();
406            // Update cache if the join key is cached.
407            if let Some(cache) = &mut self.cache
408                && let Some(mut entry) = cache.get_mut(&join_key)
409            {
410                entry
411                    .insert(ineq_key, pk_suffix, E::encode(&owned_row))
412                    .with_context(|| format!("row: {}", owned_row.display()))?;
413            } else {
414                self.metrics.inc_insert_cache_miss();
415            }
416            self.state.table.insert(owned_row);
417        } else {
418            self.state.table.insert(value);
419        }
420        Ok(())
421    }
422
423    /// Delete a row from the state table and optionally update the cache.
424    pub fn delete(&mut self, value: impl Row) -> StreamExecutorResult<()> {
425        if self.cache.is_some() {
426            // Convert to OwnedRow first to avoid move issues with `impl Row`.
427            let owned_row = value.to_owned_row();
428            let ineq_key = self.serialize_inequal_key_from_state_row(&owned_row);
429            let pk_suffix = self.serialize_pk_suffix(&owned_row);
430            // Use reference to avoid moving owned_row.
431            let join_key = (&owned_row).project(&self.join_key_indices).to_owned_row();
432            // Update cache if the join key is cached.
433            if let Some(cache) = &mut self.cache
434                && let Some(mut entry) = cache.get_mut(&join_key)
435            {
436                entry
437                    .delete(&ineq_key, &pk_suffix)
438                    .with_context(|| format!("row: {}", owned_row.display()))?;
439            }
440            self.state.table.delete(owned_row);
441        } else {
442            self.state.table.delete(value);
443        }
444        Ok(())
445    }
446
447    /// Evict the LRU cache. No-op if cache is not enabled.
448    pub fn evict_cache(&mut self) {
449        if let Some(cache) = &mut self.cache {
450            cache.evict();
451        }
452    }
453
454    /// Flush the state table.
455    pub async fn flush(
456        &mut self,
457        epoch: EpochPair,
458    ) -> StreamExecutorResult<AsOfJoinHashMapPostCommit<'_, S, E>> {
459        self.metrics.flush();
460        if let Some(cache) = &mut self.cache {
461            cache.evict();
462        }
463        let state_post_commit = self.state.table.commit(epoch).await?;
464        Ok(AsOfJoinHashMapPostCommit {
465            state: state_post_commit,
466            cache: self.cache.as_mut(),
467        })
468    }
469
470    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
471        self.state.table.try_flush().await?;
472        Ok(())
473    }
474
475    pub async fn upper_bound_by_inequality_with_jk_prefix(
476        &mut self,
477        join_key: impl Row,
478        bound: Bound<&impl Row>,
479    ) -> StreamExecutorResult<Option<OwnedRow>> {
480        if self.cache.is_some() {
481            let join_key_owned = join_key.to_owned_row();
482            // Serialize bound bytes before borrowing cache to avoid borrow conflicts.
483            let bound_bytes = match &bound {
484                Bound::Included(r) => Bound::Included(self.serialize_inequal_key(r)),
485                Bound::Excluded(r) => Bound::Excluded(self.serialize_inequal_key(r)),
486                Bound::Unbounded => Bound::Unbounded,
487            };
488            self.ensure_cache_populated(&join_key_owned).await?;
489            let cache = self.cache.as_mut().unwrap();
490            if let Some(entry) = cache.get(&join_key_owned) {
491                let bound_ref = match &bound_bytes {
492                    Bound::Included(b) => Bound::Included(b.as_slice()),
493                    Bound::Excluded(b) => Bound::Excluded(b.as_slice()),
494                    Bound::Unbounded => Bound::Unbounded,
495                };
496                return Ok(entry
497                    .upper_bound(bound_ref)
498                    .map(|v| E::decode(v, &self.state_all_data_types)));
499            }
500            // Not in cache (shouldn't happen after ensure_cache_populated, but fallback)
501        }
502        // Non-cached path: use rev_iter to find the largest inequality key within the
503        // bound, then forward-scan for the smallest PK at that inequality key. This
504        // matches the cached path's behavior (smallest PK wins on tie), which is
505        // required for consistency with eq_join_right's first_two_by_inequality logic.
506        let sub_range = (Bound::<OwnedRow>::Unbounded, bound);
507        let table_iter = self
508            .state
509            .table
510            .rev_iter_with_prefix(&join_key, &sub_range, PrefetchOptions::default())
511            .await?;
512        let mut pinned_table_iter = std::pin::pin!(table_iter);
513        let Some(rev_row) = pinned_table_iter.next().await.transpose()? else {
514            return Ok(None);
515        };
516        // Extract the inequality key from the found row and forward-scan for smallest PK.
517        let found_ineq_key = (&rev_row)
518            .project(&[self.inequality_key_idx])
519            .to_owned_row();
520        let exact_range = (
521            Bound::Included(&found_ineq_key),
522            Bound::Included(&found_ineq_key),
523        );
524        let fwd_iter = self
525            .state
526            .table
527            .iter_with_prefix(&join_key, &exact_range, PrefetchOptions::default())
528            .await?;
529        let mut pinned_fwd = std::pin::pin!(fwd_iter);
530        // Forward scan returns smallest PK first; fall back to rev_row if empty (shouldn't happen).
531        let row = pinned_fwd.next().await.transpose()?;
532        Ok(row.or(Some(rev_row)))
533    }
534
535    pub async fn lower_bound_by_inequality_with_jk_prefix(
536        &mut self,
537        join_key: &impl Row,
538        bound: Bound<&impl Row>,
539    ) -> StreamExecutorResult<Option<OwnedRow>> {
540        if self.cache.is_some() {
541            let join_key_owned = join_key.to_owned_row();
542            // Serialize bound bytes before borrowing cache to avoid borrow conflicts.
543            let bound_bytes = match &bound {
544                Bound::Included(r) => Bound::Included(self.serialize_inequal_key(r)),
545                Bound::Excluded(r) => Bound::Excluded(self.serialize_inequal_key(r)),
546                Bound::Unbounded => Bound::Unbounded,
547            };
548            self.ensure_cache_populated(&join_key_owned).await?;
549            let cache = self.cache.as_mut().unwrap();
550            if let Some(entry) = cache.get(&join_key_owned) {
551                let bound_ref = match &bound_bytes {
552                    Bound::Included(b) => Bound::Included(b.as_slice()),
553                    Bound::Excluded(b) => Bound::Excluded(b.as_slice()),
554                    Bound::Unbounded => Bound::Unbounded,
555                };
556                return Ok(entry
557                    .lower_bound(bound_ref)
558                    .map(|v| E::decode(v, &self.state_all_data_types)));
559            }
560        }
561        // Non-cached path
562        let sub_range = (bound, Bound::<OwnedRow>::Unbounded);
563        let table_iter = self
564            .state
565            .table
566            .iter_with_prefix(join_key, &sub_range, PrefetchOptions::default())
567            .await?;
568        let mut pinned_table_iter = std::pin::pin!(table_iter);
569        let row = pinned_table_iter.next().await.transpose()?;
570        Ok(row)
571    }
572
573    async fn table_iter_by_inequality_with_jk_prefix<'a>(
574        &'a self,
575        join_key: &'a impl Row,
576        range: &'a (Bound<impl Row>, Bound<impl Row>),
577    ) -> StreamExecutorResult<impl RowStream<'a>> {
578        self.state
579            .table
580            .iter_with_prefix(join_key, range, PrefetchOptions::default())
581            .await
582    }
583
584    pub async fn first_by_inequality_with_jk_prefix(
585        &mut self,
586        join_key: &impl Row,
587        inequality_key: &impl Row,
588    ) -> StreamExecutorResult<Option<OwnedRow>> {
589        if self.cache.is_some() {
590            let join_key_owned = join_key.to_owned_row();
591            // Serialize before borrowing cache to avoid borrow conflicts.
592            let ineq_key_bytes = self.serialize_inequal_key(inequality_key);
593            self.ensure_cache_populated(&join_key_owned).await?;
594            let cache = self.cache.as_mut().unwrap();
595            if let Some(entry) = cache.get(&join_key_owned) {
596                return Ok(entry
597                    .first_by_inequality(&ineq_key_bytes)
598                    .map(|v| E::decode(v, &self.state_all_data_types)));
599            }
600        }
601        // Non-cached path
602        let range = (
603            Bound::Included(inequality_key),
604            Bound::Included(inequality_key),
605        );
606        let table_iter = self
607            .table_iter_by_inequality_with_jk_prefix(join_key, &range)
608            .await?;
609        let mut pinned_table_iter = std::pin::pin!(table_iter);
610        let row = pinned_table_iter.next().await.transpose()?;
611        Ok(row)
612    }
613
614    /// Return the first two rows (by PK order) with exact inequality key match.
615    /// Used by `eq_join_right` to determine replacement: Insert needs first row
616    /// for comparison, Delete needs first + second for fallback.
617    pub async fn first_two_by_inequality_with_jk_prefix(
618        &mut self,
619        join_key: &impl Row,
620        inequality_key: &impl Row,
621    ) -> StreamExecutorResult<(Option<OwnedRow>, Option<OwnedRow>)> {
622        if self.cache.is_some() {
623            let join_key_owned = join_key.to_owned_row();
624            let ineq_key_bytes = self.serialize_inequal_key(inequality_key);
625            self.ensure_cache_populated(&join_key_owned).await?;
626            let cache = self.cache.as_mut().unwrap();
627            if let Some(entry) = cache.get(&join_key_owned) {
628                let (first, second) = entry.first_two_by_inequality(&ineq_key_bytes);
629                return Ok((
630                    first.map(|v| E::decode(v, &self.state_all_data_types)),
631                    second.map(|v| E::decode(v, &self.state_all_data_types)),
632                ));
633            }
634        }
635        // Non-cached path
636        let range = (
637            Bound::Included(inequality_key),
638            Bound::Included(inequality_key),
639        );
640        let table_iter = self
641            .table_iter_by_inequality_with_jk_prefix(join_key, &range)
642            .await?;
643        let mut pinned_table_iter = std::pin::pin!(table_iter);
644        let first = pinned_table_iter.next().await.transpose()?;
645        let second = if first.is_some() {
646            pinned_table_iter.next().await.transpose()?
647        } else {
648            None
649        };
650        Ok((first, second))
651    }
652
653    pub async fn range_by_inequality_with_jk_prefix<'a>(
654        &'a mut self,
655        join_key: &'a impl Row,
656        inequality_key_range: &'a (Bound<impl Row>, Bound<impl Row>),
657    ) -> StreamExecutorResult<impl RowStream<'a>> {
658        if self.cache.is_some() {
659            let join_key_owned = join_key.to_owned_row();
660            // Serialize bound bytes before borrowing cache.
661            let bound_lo = match &inequality_key_range.0 {
662                Bound::Included(r) => Bound::Included(self.serialize_inequal_key(r)),
663                Bound::Excluded(r) => Bound::Excluded(self.serialize_inequal_key(r)),
664                Bound::Unbounded => Bound::Unbounded,
665            };
666            let bound_hi = match &inequality_key_range.1 {
667                Bound::Included(r) => Bound::Included(self.serialize_inequal_key(r)),
668                Bound::Excluded(r) => Bound::Excluded(self.serialize_inequal_key(r)),
669                Bound::Unbounded => Bound::Unbounded,
670            };
671            self.ensure_cache_populated(&join_key_owned).await?;
672            let cache = self.cache.as_mut().unwrap();
673            if let Some(entry) = cache.get(&join_key_owned) {
674                let range = (
675                    bound_lo.as_ref().map(|b| b.as_slice()),
676                    bound_hi.as_ref().map(|b| b.as_slice()),
677                );
678                // Cached: lazily decode cloned rows from cache and wrap as stream.
679                let data_types = &self.state_all_data_types;
680                return Ok(futures::future::Either::Left(futures::stream::iter(
681                    entry.range(range).map(|v| Ok(E::decode(v, data_types))),
682                )));
683            }
684        }
685        // Non-cached path: stream directly from state table without collecting.
686        let table_iter = self
687            .state
688            .table
689            .iter_with_prefix(join_key, inequality_key_range, PrefetchOptions::default())
690            .await?;
691        Ok(futures::future::Either::Right(table_iter))
692    }
693
694    /// Return true if the inequality key is null.
695    pub fn check_inequal_key_null(&self, row: &impl Row) -> bool {
696        row.datum_at(self.inequality_key_idx).is_none()
697    }
698
699    pub fn get_pk_from_row<'a>(&'a self, row: impl Row + 'a) -> impl Row + 'a {
700        row.project(&self.state.pk_indices)
701    }
702}
703
704#[must_use]
705pub struct AsOfJoinHashMapPostCommit<'a, S: StateStore, E: AsOfRowEncoding> {
706    state: StateTablePostCommit<'a, S>,
707    cache: Option<&'a mut ManagedLruCache<OwnedRow, AsOfJoinCacheEntry<E::Encoded>>>,
708}
709
710impl<S: StateStore, E: AsOfRowEncoding> AsOfJoinHashMapPostCommit<'_, S, E> {
711    pub async fn post_yield_barrier(
712        self,
713        vnode_bitmap: Option<Arc<Bitmap>>,
714    ) -> StreamExecutorResult<Option<bool>> {
715        let cache_may_stale = self.state.post_yield_barrier(vnode_bitmap.clone()).await?;
716        let keyed_cache_may_stale = cache_may_stale.map(|(_, changed)| changed);
717        // Clear cache if the keyed cache may be stale after the barrier update.
718        if keyed_cache_may_stale.unwrap_or(false)
719            && let Some(cache) = self.cache
720        {
721            cache.clear();
722        }
723        Ok(keyed_cache_may_stale)
724    }
725}