risingwave_stream/executor/over_window/
over_partition.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and functions that store or manipulate state/cache inside one single over window
16//! partition.
17
18use std::collections::BTreeMap;
19use std::marker::PhantomData;
20use std::ops::{Bound, RangeInclusive};
21
22use delta_btree_map::{Change, DeltaBTreeMap};
23use educe::Educe;
24use futures_async_stream::for_await;
25use risingwave_common::array::stream_record::Record;
26use risingwave_common::row::{OwnedRow, Row, RowExt};
27use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
28use risingwave_common::types::{Datum, Sentinelled};
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
31use risingwave_expr::window_function::{StateKey, WindowStates, create_window_state};
32use risingwave_storage::StateStore;
33use risingwave_storage::store::PrefetchOptions;
34use static_assertions::const_assert;
35
36use super::general::{Calls, RowConverter};
37use crate::common::table::state_table::StateTable;
38use crate::consistency::{consistency_error, enable_strict_consistency};
39use crate::executor::StreamExecutorResult;
40use crate::executor::over_window::frame_finder::*;
41
42pub(super) type CacheKey = Sentinelled<StateKey>;
43
44/// Range cache for one over window partition.
45/// The cache entries can be:
46///
47/// - `(Normal)*`
48/// - `Smallest, (Normal)*, Largest`
49/// - `(Normal)+, Largest`
50/// - `Smallest, (Normal)+`
51///
52/// This means it's impossible to only have one sentinel in the cache without any normal entry,
53/// and, each of the two types of sentinel can only appear once. Also, since sentinels are either
54/// smallest or largest, they always appear at the beginning or the end of the cache.
55pub(super) type PartitionCache = EstimatedBTreeMap<CacheKey, OwnedRow>;
56
57/// Changes happened in one over window partition.
58pub(super) type PartitionDelta = BTreeMap<CacheKey, Change<OwnedRow>>;
59
60pub(super) fn new_empty_partition_cache() -> PartitionCache {
61    let mut cache = PartitionCache::new();
62    cache.insert(CacheKey::Smallest, OwnedRow::empty());
63    cache.insert(CacheKey::Largest, OwnedRow::empty());
64    cache
65}
66
67const MAGIC_CACHE_SIZE: usize = 1024;
68const MAGIC_JITTER_PREVENTION: usize = MAGIC_CACHE_SIZE / 8;
69
70pub(super) fn shrink_partition_cache(
71    deduped_part_key: &OwnedRow,
72    range_cache: &mut PartitionCache,
73    cache_policy: CachePolicy,
74    recently_accessed_range: RangeInclusive<StateKey>,
75) {
76    tracing::trace!(
77        partition=?deduped_part_key,
78        cache_policy=?cache_policy,
79        recently_accessed_range=?recently_accessed_range,
80        "find the range to retain in the range cache"
81    );
82
83    let (start, end) = match cache_policy {
84        CachePolicy::Full => {
85            // evict nothing if the policy is to cache full partition
86            return;
87        }
88        CachePolicy::Recent => {
89            let (sk_start, sk_end) = recently_accessed_range.into_inner();
90            let (ck_start, ck_end) = (CacheKey::from(sk_start), CacheKey::from(sk_end));
91
92            // find the cursor just before `ck_start`
93            let mut cursor = range_cache.inner().upper_bound(Bound::Excluded(&ck_start));
94            for _ in 0..MAGIC_JITTER_PREVENTION {
95                if cursor.prev().is_none() {
96                    // already at the beginning
97                    break;
98                }
99            }
100            let start = cursor
101                .peek_prev()
102                .map(|(k, _)| k)
103                .unwrap_or_else(|| range_cache.first_key_value().unwrap().0)
104                .clone();
105
106            // find the cursor just after `ck_end`
107            let mut cursor = range_cache.inner().lower_bound(Bound::Excluded(&ck_end));
108            for _ in 0..MAGIC_JITTER_PREVENTION {
109                if cursor.next().is_none() {
110                    // already at the end
111                    break;
112                }
113            }
114            let end = cursor
115                .peek_next()
116                .map(|(k, _)| k)
117                .unwrap_or_else(|| range_cache.last_key_value().unwrap().0)
118                .clone();
119
120            (start, end)
121        }
122        CachePolicy::RecentFirstN => {
123            if range_cache.len() <= MAGIC_CACHE_SIZE {
124                // no need to evict if cache len <= N
125                return;
126            } else {
127                let (sk_start, _sk_end) = recently_accessed_range.into_inner();
128                let ck_start = CacheKey::from(sk_start);
129
130                let mut capacity_remain = MAGIC_CACHE_SIZE; // precision is not important here, code simplicity is the first
131                const_assert!(MAGIC_JITTER_PREVENTION < MAGIC_CACHE_SIZE);
132
133                // find the cursor just before `ck_start`
134                let cursor_just_before_ck_start =
135                    range_cache.inner().upper_bound(Bound::Excluded(&ck_start));
136
137                let mut cursor = cursor_just_before_ck_start.clone();
138                // go back for at most `MAGIC_JITTER_PREVENTION` entries
139                for _ in 0..MAGIC_JITTER_PREVENTION {
140                    if cursor.prev().is_none() {
141                        // already at the beginning
142                        break;
143                    }
144                    capacity_remain -= 1;
145                }
146                let start = cursor
147                    .peek_prev()
148                    .map(|(k, _)| k)
149                    .unwrap_or_else(|| range_cache.first_key_value().unwrap().0)
150                    .clone();
151
152                let mut cursor = cursor_just_before_ck_start;
153                // go forward for at most `capacity_remain` entries
154                for _ in 0..capacity_remain {
155                    if cursor.next().is_none() {
156                        // already at the end
157                        break;
158                    }
159                }
160                let end = cursor
161                    .peek_next()
162                    .map(|(k, _)| k)
163                    .unwrap_or_else(|| range_cache.last_key_value().unwrap().0)
164                    .clone();
165
166                (start, end)
167            }
168        }
169        CachePolicy::RecentLastN => {
170            if range_cache.len() <= MAGIC_CACHE_SIZE {
171                // no need to evict if cache len <= N
172                return;
173            } else {
174                let (_sk_start, sk_end) = recently_accessed_range.into_inner();
175                let ck_end = CacheKey::from(sk_end);
176
177                let mut capacity_remain = MAGIC_CACHE_SIZE; // precision is not important here, code simplicity is the first
178                const_assert!(MAGIC_JITTER_PREVENTION < MAGIC_CACHE_SIZE);
179
180                // find the cursor just after `ck_end`
181                let cursor_just_after_ck_end =
182                    range_cache.inner().lower_bound(Bound::Excluded(&ck_end));
183
184                let mut cursor = cursor_just_after_ck_end.clone();
185                // go forward for at most `MAGIC_JITTER_PREVENTION` entries
186                for _ in 0..MAGIC_JITTER_PREVENTION {
187                    if cursor.next().is_none() {
188                        // already at the end
189                        break;
190                    }
191                    capacity_remain -= 1;
192                }
193                let end = cursor
194                    .peek_next()
195                    .map(|(k, _)| k)
196                    .unwrap_or_else(|| range_cache.last_key_value().unwrap().0)
197                    .clone();
198
199                let mut cursor = cursor_just_after_ck_end;
200                // go back for at most `capacity_remain` entries
201                for _ in 0..capacity_remain {
202                    if cursor.prev().is_none() {
203                        // already at the beginning
204                        break;
205                    }
206                }
207                let start = cursor
208                    .peek_prev()
209                    .map(|(k, _)| k)
210                    .unwrap_or_else(|| range_cache.first_key_value().unwrap().0)
211                    .clone();
212
213                (start, end)
214            }
215        }
216    };
217
218    tracing::trace!(
219        partition=?deduped_part_key,
220        retain_range=?(&start..=&end),
221        "retain range in the range cache"
222    );
223
224    let (left_removed, right_removed) = range_cache.retain_range(&start..=&end);
225    if range_cache.is_empty() {
226        if !left_removed.is_empty() || !right_removed.is_empty() {
227            range_cache.insert(CacheKey::Smallest, OwnedRow::empty());
228            range_cache.insert(CacheKey::Largest, OwnedRow::empty());
229        }
230    } else {
231        if !left_removed.is_empty() {
232            range_cache.insert(CacheKey::Smallest, OwnedRow::empty());
233        }
234        if !right_removed.is_empty() {
235            range_cache.insert(CacheKey::Largest, OwnedRow::empty());
236        }
237    }
238}
239
240#[derive(Default, Debug)]
241pub(super) struct OverPartitionStats {
242    // stats for range cache operations
243    pub lookup_count: u64,
244    pub left_miss_count: u64,
245    pub right_miss_count: u64,
246
247    // stats for window function state computation
248    pub accessed_entry_count: u64,
249    pub compute_count: u64,
250    pub same_output_count: u64,
251}
252
253/// [`AffectedRange`] represents a range of keys that are affected by a delta.
254/// The [`CacheKey`] fields are keys in the partition range cache + delta, which is
255/// represented by [`DeltaBTreeMap`].
256///
257/// - `first_curr_key` and `last_curr_key` are the current keys of the first and the last
258///   windows affected. They are used to pinpoint the bounds where state needs to be updated.
259/// - `first_frame_start` and `last_frame_end` are the frame start and end of the first and
260///   the last windows affected. They are used to pinpoint the bounds where state needs to be
261///   included for computing the new state.
262#[derive(Debug, Educe)]
263#[educe(Clone, Copy)]
264pub(super) struct AffectedRange<'a> {
265    pub first_frame_start: &'a CacheKey,
266    pub first_curr_key: &'a CacheKey,
267    pub last_curr_key: &'a CacheKey,
268    pub last_frame_end: &'a CacheKey,
269}
270
271impl<'a> AffectedRange<'a> {
272    fn new(
273        first_frame_start: &'a CacheKey,
274        first_curr_key: &'a CacheKey,
275        last_curr_key: &'a CacheKey,
276        last_frame_end: &'a CacheKey,
277    ) -> Self {
278        Self {
279            first_frame_start,
280            first_curr_key,
281            last_curr_key,
282            last_frame_end,
283        }
284    }
285}
286
287/// A wrapper of [`PartitionCache`] that provides helper methods to manipulate the cache.
288/// By putting this type inside `private` module, we can avoid misuse of the internal fields and
289/// methods.
290pub(super) struct OverPartition<'a, S: StateStore> {
291    deduped_part_key: &'a OwnedRow,
292    range_cache: &'a mut PartitionCache,
293    cache_policy: CachePolicy,
294
295    calls: &'a Calls,
296    row_conv: RowConverter<'a>,
297
298    stats: OverPartitionStats,
299
300    _phantom: PhantomData<S>,
301}
302
303const MAGIC_BATCH_SIZE: usize = 512;
304
305impl<'a, S: StateStore> OverPartition<'a, S> {
306    #[allow(clippy::too_many_arguments)]
307    pub fn new(
308        deduped_part_key: &'a OwnedRow,
309        cache: &'a mut PartitionCache,
310        cache_policy: CachePolicy,
311        calls: &'a Calls,
312        row_conv: RowConverter<'a>,
313    ) -> Self {
314        Self {
315            deduped_part_key,
316            range_cache: cache,
317            cache_policy,
318
319            calls,
320            row_conv,
321
322            stats: Default::default(),
323
324            _phantom: PhantomData,
325        }
326    }
327
328    /// Get a summary for the execution happened in the [`OverPartition`] in current round.
329    /// This will consume the [`OverPartition`] value itself.
330    pub fn summarize(self) -> OverPartitionStats {
331        // We may extend this function in the future.
332        self.stats
333    }
334
335    /// Get the number of cached entries ignoring sentinels.
336    pub fn cache_real_len(&self) -> usize {
337        let len = self.range_cache.inner().len();
338        if len <= 1 {
339            debug_assert!(
340                self.range_cache
341                    .inner()
342                    .first_key_value()
343                    .map(|(k, _)| k.is_normal())
344                    .unwrap_or(true)
345            );
346            return len;
347        }
348        // len >= 2
349        let cache_inner = self.range_cache.inner();
350        let sentinels = [
351            // sentinels only appear at the beginning and/or the end
352            cache_inner.first_key_value().unwrap().0.is_sentinel(),
353            cache_inner.last_key_value().unwrap().0.is_sentinel(),
354        ];
355        len - sentinels.into_iter().filter(|x| *x).count()
356    }
357
358    fn cache_real_first_key(&self) -> Option<&StateKey> {
359        self.range_cache
360            .inner()
361            .iter()
362            .find(|(k, _)| k.is_normal())
363            .map(|(k, _)| k.as_normal_expect())
364    }
365
366    fn cache_real_last_key(&self) -> Option<&StateKey> {
367        self.range_cache
368            .inner()
369            .iter()
370            .rev()
371            .find(|(k, _)| k.is_normal())
372            .map(|(k, _)| k.as_normal_expect())
373    }
374
375    fn cache_left_is_sentinel(&self) -> bool {
376        self.range_cache
377            .first_key_value()
378            .map(|(k, _)| k.is_sentinel())
379            .unwrap_or(false)
380    }
381
382    fn cache_right_is_sentinel(&self) -> bool {
383        self.range_cache
384            .last_key_value()
385            .map(|(k, _)| k.is_sentinel())
386            .unwrap_or(false)
387    }
388
389    /// Build changes for the partition, with the given `delta`. Necessary maintenance of the range
390    /// cache will be done during this process, like loading rows from the `table` into the cache.
391    pub async fn build_changes(
392        &mut self,
393        table: &StateTable<S>,
394        mut delta: PartitionDelta,
395    ) -> StreamExecutorResult<(
396        BTreeMap<StateKey, Record<OwnedRow>>,
397        Option<RangeInclusive<StateKey>>,
398    )> {
399        let calls = self.calls;
400        let input_schema_len = table.get_data_types().len() - calls.len();
401        let numbering_only = calls.numbering_only;
402        let has_rank = calls.has_rank;
403
404        // return values
405        let mut part_changes = BTreeMap::new();
406        let mut accessed_range: Option<RangeInclusive<StateKey>> = None;
407
408        // stats
409        let mut accessed_entry_count = 0;
410        let mut compute_count = 0;
411        let mut same_output_count = 0;
412
413        // Find affected ranges, this also ensures that all rows in the affected ranges are loaded into the cache.
414        let (part_with_delta, affected_ranges) =
415            self.find_affected_ranges(table, &mut delta).await?;
416
417        let snapshot = part_with_delta.snapshot();
418        let delta = part_with_delta.delta();
419        let last_delta_key = delta.last_key_value().map(|(k, _)| k.as_normal_expect());
420
421        // Generate delete changes first, because deletes are skipped during iteration over
422        // `part_with_delta` in the next step.
423        for (key, change) in delta {
424            if change.is_delete() {
425                part_changes.insert(
426                    key.as_normal_expect().clone(),
427                    Record::Delete {
428                        old_row: snapshot.get(key).unwrap().clone(),
429                    },
430                );
431            }
432        }
433
434        for AffectedRange {
435            first_frame_start,
436            first_curr_key,
437            last_curr_key,
438            last_frame_end,
439        } in affected_ranges
440        {
441            assert!(first_frame_start <= first_curr_key);
442            assert!(first_curr_key <= last_curr_key);
443            assert!(last_curr_key <= last_frame_end);
444            assert!(first_frame_start.is_normal());
445            assert!(first_curr_key.is_normal());
446            assert!(last_curr_key.is_normal());
447            assert!(last_frame_end.is_normal());
448
449            let last_delta_key = last_delta_key.unwrap();
450
451            if let Some(accessed_range) = accessed_range.as_mut() {
452                let min_start = first_frame_start
453                    .as_normal_expect()
454                    .min(accessed_range.start())
455                    .clone();
456                let max_end = last_frame_end
457                    .as_normal_expect()
458                    .max(accessed_range.end())
459                    .clone();
460                *accessed_range = min_start..=max_end;
461            } else {
462                accessed_range = Some(
463                    first_frame_start.as_normal_expect().clone()
464                        ..=last_frame_end.as_normal_expect().clone(),
465                );
466            }
467
468            let mut states =
469                WindowStates::new(calls.iter().map(create_window_state).try_collect()?);
470
471            // Populate window states with the affected range of rows.
472            {
473                let mut cursor = part_with_delta
474                    .before(first_frame_start)
475                    .expect("first frame start key must exist");
476
477                while let Some((key, row)) = cursor.next() {
478                    accessed_entry_count += 1;
479
480                    for (call, state) in calls.iter().zip_eq_fast(states.iter_mut()) {
481                        // TODO(rc): batch appending
482                        // TODO(rc): append not only the arguments but also the old output for optimization
483                        state.append(
484                            key.as_normal_expect().clone(),
485                            row.project(call.args.val_indices())
486                                .into_owned_row()
487                                .as_inner()
488                                .into(),
489                        );
490                    }
491
492                    if key == last_frame_end {
493                        break;
494                    }
495                }
496            }
497
498            // Slide to the first affected key. We can safely pass in `first_curr_key` here
499            // because it definitely exists in the states by the definition of affected range.
500            states.just_slide_to(first_curr_key.as_normal_expect())?;
501            let mut curr_key_cursor = part_with_delta.before(first_curr_key).unwrap();
502            assert_eq!(
503                states.curr_key(),
504                curr_key_cursor
505                    .peek_next()
506                    .map(|(k, _)| k)
507                    .map(CacheKey::as_normal_expect)
508            );
509
510            // Slide and generate changes.
511            while let Some((key, row)) = curr_key_cursor.next() {
512                let mut should_stop = false;
513
514                let output = states.slide_no_evict_hint()?;
515                compute_count += 1;
516
517                let old_output = &row.as_inner()[input_schema_len..];
518                if !old_output.is_empty() && old_output == output {
519                    same_output_count += 1;
520
521                    if numbering_only {
522                        if has_rank {
523                            // It's possible that an `Insert` doesn't affect it's ties but affects
524                            // all the following rows, so we need to check the `order_key`.
525                            if key.as_normal_expect().order_key > last_delta_key.order_key {
526                                // there won't be any more changes after this point, we can stop early
527                                should_stop = true;
528                            }
529                        } else if key.as_normal_expect() >= last_delta_key {
530                            // there won't be any more changes after this point, we can stop early
531                            should_stop = true;
532                        }
533                    }
534                }
535
536                let new_row = OwnedRow::new(
537                    row.as_inner()
538                        .iter()
539                        .take(input_schema_len)
540                        .cloned()
541                        .chain(output)
542                        .collect(),
543                );
544
545                if let Some(old_row) = snapshot.get(key).cloned() {
546                    // update
547                    if old_row != new_row {
548                        part_changes.insert(
549                            key.as_normal_expect().clone(),
550                            Record::Update { old_row, new_row },
551                        );
552                    }
553                } else {
554                    // insert
555                    part_changes.insert(key.as_normal_expect().clone(), Record::Insert { new_row });
556                }
557
558                if should_stop || key == last_curr_key {
559                    break;
560                }
561            }
562        }
563
564        self.stats.accessed_entry_count += accessed_entry_count;
565        self.stats.compute_count += compute_count;
566        self.stats.same_output_count += same_output_count;
567
568        Ok((part_changes, accessed_range))
569    }
570
571    /// Write a change record to state table and cache.
572    /// This function must be called after finding affected ranges, which means the change records
573    /// should never exceed the cached range.
574    pub fn write_record(
575        &mut self,
576        table: &mut StateTable<S>,
577        key: StateKey,
578        record: Record<OwnedRow>,
579    ) {
580        table.write_record(record.as_ref());
581        match record {
582            Record::Insert { new_row } | Record::Update { new_row, .. } => {
583                self.range_cache.insert(CacheKey::from(key), new_row);
584            }
585            Record::Delete { .. } => {
586                self.range_cache.remove(&CacheKey::from(key));
587
588                if self.cache_real_len() == 0 && self.range_cache.len() == 1 {
589                    // only one sentinel remains, should insert the other
590                    self.range_cache
591                        .insert(CacheKey::Smallest, OwnedRow::empty());
592                    self.range_cache
593                        .insert(CacheKey::Largest, OwnedRow::empty());
594                }
595            }
596        }
597    }
598
599    /// Find all ranges in the partition that are affected by the given delta.
600    /// The returned ranges are guaranteed to be sorted and non-overlapping. All keys in the ranges
601    /// are guaranteed to be cached, which means they should be [`Sentinelled::Normal`]s.
602    async fn find_affected_ranges<'s, 'delta>(
603        &'s mut self,
604        table: &StateTable<S>,
605        delta: &'delta mut PartitionDelta,
606    ) -> StreamExecutorResult<(
607        DeltaBTreeMap<'delta, CacheKey, OwnedRow>,
608        Vec<AffectedRange<'delta>>,
609    )>
610    where
611        'a: 'delta,
612        's: 'delta,
613    {
614        if delta.is_empty() {
615            return Ok((DeltaBTreeMap::new(self.range_cache.inner(), delta), vec![]));
616        }
617
618        self.ensure_delta_in_cache(table, delta).await?;
619        let delta = &*delta; // let's make it immutable
620
621        let delta_first = delta.first_key_value().unwrap().0.as_normal_expect();
622        let delta_last = delta.last_key_value().unwrap().0.as_normal_expect();
623
624        let range_frame_logical_curr =
625            calc_logical_curr_for_range_frames(&self.calls.range_frames, delta_first, delta_last);
626
627        loop {
628            // TERMINATEABILITY: `extend_cache_leftward_by_n` and `extend_cache_rightward_by_n` keep
629            // pushing the cache to the boundary of current partition. In these two methods, when
630            // any side of boundary is reached, the sentinel key will be removed, so finally
631            // `Self::find_affected_ranges_readonly` will return `Ok`.
632
633            // SAFETY: Here we shortly borrow the range cache and turn the reference into a
634            // `'delta` one to bypass the borrow checker. This is safe because we only return
635            // the reference once we don't need to do any further mutation.
636            let cache_inner = unsafe { &*(self.range_cache.inner() as *const _) };
637            let part_with_delta = DeltaBTreeMap::new(cache_inner, delta);
638
639            self.stats.lookup_count += 1;
640            let res = self
641                .find_affected_ranges_readonly(part_with_delta, range_frame_logical_curr.as_ref());
642
643            let (need_extend_leftward, need_extend_rightward) = match res {
644                Ok(ranges) => return Ok((part_with_delta, ranges)),
645                Err(cache_extend_hint) => cache_extend_hint,
646            };
647
648            if need_extend_leftward {
649                self.stats.left_miss_count += 1;
650                tracing::trace!(partition=?self.deduped_part_key, "partition cache left extension triggered");
651                let left_most = self.cache_real_first_key().unwrap_or(delta_first).clone();
652                self.extend_cache_leftward_by_n(table, &left_most).await?;
653            }
654            if need_extend_rightward {
655                self.stats.right_miss_count += 1;
656                tracing::trace!(partition=?self.deduped_part_key, "partition cache right extension triggered");
657                let right_most = self.cache_real_last_key().unwrap_or(delta_last).clone();
658                self.extend_cache_rightward_by_n(table, &right_most).await?;
659            }
660            tracing::trace!(partition=?self.deduped_part_key, "partition cache extended");
661        }
662    }
663
664    async fn ensure_delta_in_cache(
665        &mut self,
666        table: &StateTable<S>,
667        delta: &mut PartitionDelta,
668    ) -> StreamExecutorResult<()> {
669        if delta.is_empty() {
670            return Ok(());
671        }
672
673        let delta_first = delta.first_key_value().unwrap().0.as_normal_expect();
674        let delta_last = delta.last_key_value().unwrap().0.as_normal_expect();
675
676        if self.cache_policy.is_full() {
677            // ensure everything is in the cache
678            self.extend_cache_to_boundary(table).await?;
679        } else {
680            // TODO(rc): later we should extend cache using `self.calls.super_rows_frame_bounds` and
681            // `range_frame_logical_curr` as hints.
682
683            // ensure the cache covers all delta (if possible)
684            self.extend_cache_by_range(table, delta_first..=delta_last)
685                .await?;
686        }
687
688        if !enable_strict_consistency() {
689            // in non-strict mode, we should ensure the delta is consistent with the cache
690            let cache = self.range_cache.inner();
691            delta.retain(|key, change| match &*change {
692                Change::Insert(_) => {
693                    // this also includes the case of double-insert and ghost-update,
694                    // but since we already lost the information, let's just ignore it
695                    true
696                }
697                Change::Delete => {
698                    // if the key is not in the cache, it's a ghost-delete
699                    let consistent = cache.contains_key(key);
700                    if !consistent {
701                        consistency_error!(?key, "removing a row with non-existing key");
702                    }
703                    consistent
704                }
705            });
706        }
707
708        Ok(())
709    }
710
711    /// Try to find affected ranges on immutable range cache + delta. If the algorithm reaches
712    /// any sentinel node in the cache, which means some entries in the affected range may be
713    /// in the state table, it returns an `Err((bool, bool))` to notify the caller that the
714    /// left side or the right side or both sides of the cache should be extended.
715    ///
716    /// TODO(rc): Currently at most one range will be in the result vector. Ideally we should
717    /// recognize uncontinuous changes in the delta and find multiple ranges, but that will be
718    /// too complex for now.
719    fn find_affected_ranges_readonly<'delta>(
720        &self,
721        part_with_delta: DeltaBTreeMap<'delta, CacheKey, OwnedRow>,
722        range_frame_logical_curr: Option<&(Sentinelled<Datum>, Sentinelled<Datum>)>,
723    ) -> std::result::Result<Vec<AffectedRange<'delta>>, (bool, bool)> {
724        if part_with_delta.first_key().is_none() {
725            // nothing is left after applying the delta, meaning all entries are deleted
726            return Ok(vec![]);
727        }
728
729        let delta_first_key = part_with_delta.delta().first_key_value().unwrap().0;
730        let delta_last_key = part_with_delta.delta().last_key_value().unwrap().0;
731        let cache_key_pk_len = delta_first_key.as_normal_expect().pk.len();
732
733        if part_with_delta.snapshot().is_empty() {
734            // all existing keys are inserted in the delta
735            return Ok(vec![AffectedRange::new(
736                delta_first_key,
737                delta_first_key,
738                delta_last_key,
739                delta_last_key,
740            )]);
741        }
742
743        let first_key = part_with_delta.first_key().unwrap();
744        let last_key = part_with_delta.last_key().unwrap();
745
746        let first_curr_key = if self.calls.end_is_unbounded || delta_first_key == first_key {
747            // If the frame end is unbounded, or, the first key is in delta, then the frame corresponding
748            // to the first key is always affected.
749            first_key
750        } else {
751            let mut key = find_first_curr_for_rows_frame(
752                &self.calls.super_rows_frame_bounds,
753                part_with_delta,
754                delta_first_key,
755            );
756
757            if let Some((logical_first_curr, _)) = range_frame_logical_curr {
758                let logical_curr = logical_first_curr.as_normal_expect(); // otherwise should go `end_is_unbounded` branch
759                let new_key = find_left_for_range_frames(
760                    &self.calls.range_frames,
761                    part_with_delta,
762                    logical_curr,
763                    cache_key_pk_len,
764                );
765                key = std::cmp::min(key, new_key);
766            }
767
768            key
769        };
770
771        let last_curr_key = if self.calls.start_is_unbounded || delta_last_key == last_key {
772            // similar to `first_curr_key`
773            last_key
774        } else {
775            let mut key = find_last_curr_for_rows_frame(
776                &self.calls.super_rows_frame_bounds,
777                part_with_delta,
778                delta_last_key,
779            );
780
781            if let Some((_, logical_last_curr)) = range_frame_logical_curr {
782                let logical_curr = logical_last_curr.as_normal_expect(); // otherwise should go `start_is_unbounded` branch
783                let new_key = find_right_for_range_frames(
784                    &self.calls.range_frames,
785                    part_with_delta,
786                    logical_curr,
787                    cache_key_pk_len,
788                );
789                key = std::cmp::max(key, new_key);
790            }
791
792            key
793        };
794
795        {
796            // We quickly return if there's any sentinel in `[first_curr_key, last_curr_key]`,
797            // just for the sake of simplicity.
798            let mut need_extend_leftward = false;
799            let mut need_extend_rightward = false;
800            for key in [first_curr_key, last_curr_key] {
801                if key.is_smallest() {
802                    need_extend_leftward = true;
803                } else if key.is_largest() {
804                    need_extend_rightward = true;
805                }
806            }
807            if need_extend_leftward || need_extend_rightward {
808                return Err((need_extend_leftward, need_extend_rightward));
809            }
810        }
811
812        // From now on we definitely have two normal `curr_key`s.
813
814        if first_curr_key > last_curr_key {
815            // Note that we cannot move the this check before the above block, because for example,
816            // if the range cache contains `[Smallest, 5, Largest]`, and the delta contains only
817            // `Delete 5`, the frame is `RANGE BETWEEN CURRENT ROW AND CURRENT ROW`, then
818            // `first_curr_key` will be `Largest`, `last_curr_key` will be `Smallest`, in this case
819            // there may be some other entries with order value `5` in the table, which should be
820            // *affected*.
821            return Ok(vec![]);
822        }
823
824        let range_frame_logical_boundary = calc_logical_boundary_for_range_frames(
825            &self.calls.range_frames,
826            first_curr_key.as_normal_expect(),
827            last_curr_key.as_normal_expect(),
828        );
829
830        let first_frame_start = if self.calls.start_is_unbounded || first_curr_key == first_key {
831            // If the frame start is unbounded, or, the first curr key is the first key, then the first key
832            // always need to be included in the affected range.
833            first_key
834        } else {
835            let mut key = find_frame_start_for_rows_frame(
836                &self.calls.super_rows_frame_bounds,
837                part_with_delta,
838                first_curr_key,
839            );
840
841            if let Some((logical_first_start, _)) = range_frame_logical_boundary.as_ref() {
842                let logical_boundary = logical_first_start.as_normal_expect(); // otherwise should go `end_is_unbounded` branch
843                let new_key = find_left_for_range_frames(
844                    &self.calls.range_frames,
845                    part_with_delta,
846                    logical_boundary,
847                    cache_key_pk_len,
848                );
849                key = std::cmp::min(key, new_key);
850            }
851
852            key
853        };
854        assert!(first_frame_start <= first_curr_key);
855
856        let last_frame_end = if self.calls.end_is_unbounded || last_curr_key == last_key {
857            // similar to `first_frame_start`
858            last_key
859        } else {
860            let mut key = find_frame_end_for_rows_frame(
861                &self.calls.super_rows_frame_bounds,
862                part_with_delta,
863                last_curr_key,
864            );
865
866            if let Some((_, logical_last_end)) = range_frame_logical_boundary.as_ref() {
867                let logical_boundary = logical_last_end.as_normal_expect(); // otherwise should go `end_is_unbounded` branch
868                let new_key = find_right_for_range_frames(
869                    &self.calls.range_frames,
870                    part_with_delta,
871                    logical_boundary,
872                    cache_key_pk_len,
873                );
874                key = std::cmp::max(key, new_key);
875            }
876
877            key
878        };
879        assert!(last_frame_end >= last_curr_key);
880
881        let mut need_extend_leftward = false;
882        let mut need_extend_rightward = false;
883        for key in [
884            first_curr_key,
885            last_curr_key,
886            first_frame_start,
887            last_frame_end,
888        ] {
889            if key.is_smallest() {
890                need_extend_leftward = true;
891            } else if key.is_largest() {
892                need_extend_rightward = true;
893            }
894        }
895
896        if need_extend_leftward || need_extend_rightward {
897            Err((need_extend_leftward, need_extend_rightward))
898        } else {
899            Ok(vec![AffectedRange::new(
900                first_frame_start,
901                first_curr_key,
902                last_curr_key,
903                last_frame_end,
904            )])
905        }
906    }
907
908    async fn extend_cache_to_boundary(
909        &mut self,
910        table: &StateTable<S>,
911    ) -> StreamExecutorResult<()> {
912        if self.cache_real_len() == self.range_cache.len() {
913            // no sentinel in the cache, meaning we already cached all entries of this partition
914            return Ok(());
915        }
916
917        tracing::trace!(partition=?self.deduped_part_key, "loading the whole partition into cache");
918
919        let mut new_cache = PartitionCache::new(); // shouldn't use `new_empty_partition_cache` here because we don't want sentinels
920        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
921        let table_iter = table
922            .iter_with_prefix(self.deduped_part_key, sub_range, PrefetchOptions::default())
923            .await?;
924
925        #[for_await]
926        for row in table_iter {
927            let row: OwnedRow = row?.into_owned_row();
928            new_cache.insert(self.row_conv.row_to_state_key(&row)?.into(), row);
929        }
930        *self.range_cache = new_cache;
931
932        Ok(())
933    }
934
935    /// Try to load the given range of entries from table into cache.
936    /// When the function returns, it's guaranteed that there's no entry in the table that is within
937    /// the given range but not in the cache.
938    async fn extend_cache_by_range(
939        &mut self,
940        table: &StateTable<S>,
941        range: RangeInclusive<&StateKey>,
942    ) -> StreamExecutorResult<()> {
943        if self.cache_real_len() == self.range_cache.len() {
944            // no sentinel in the cache, meaning we already cached all entries of this partition
945            return Ok(());
946        }
947        assert!(self.range_cache.len() >= 2);
948
949        let cache_real_first_key = self.cache_real_first_key();
950        let cache_real_last_key = self.cache_real_last_key();
951
952        if cache_real_first_key.is_some() && *range.end() < cache_real_first_key.unwrap()
953            || cache_real_last_key.is_some() && *range.start() > cache_real_last_key.unwrap()
954        {
955            // completely not overlapping, for the sake of simplicity, we re-init the cache
956            tracing::debug!(
957                partition=?self.deduped_part_key,
958                cache_first=?cache_real_first_key,
959                cache_last=?cache_real_last_key,
960                range=?range,
961                "modified range is completely non-overlapping with the cached range, re-initializing the cache"
962            );
963            *self.range_cache = new_empty_partition_cache();
964        }
965
966        if self.cache_real_len() == 0 {
967            // no normal entry in the cache, just load the given range
968            let table_sub_range = (
969                Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.start())?),
970                Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?),
971            );
972            tracing::debug!(
973                partition=?self.deduped_part_key,
974                table_sub_range=?table_sub_range,
975                "cache is empty, just loading the given range"
976            );
977            return self
978                .extend_cache_by_range_inner(table, table_sub_range)
979                .await;
980        }
981
982        let cache_real_first_key = self
983            .cache_real_first_key()
984            .expect("cache real len is not 0");
985        if self.cache_left_is_sentinel() && *range.start() < cache_real_first_key {
986            // extend leftward only if there's smallest sentinel
987            let table_sub_range = (
988                Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.start())?),
989                Bound::Excluded(
990                    self.row_conv
991                        .state_key_to_table_sub_pk(cache_real_first_key)?,
992                ),
993            );
994            tracing::trace!(
995                partition=?self.deduped_part_key,
996                table_sub_range=?table_sub_range,
997                "loading the left half of given range"
998            );
999            self.extend_cache_by_range_inner(table, table_sub_range)
1000                .await?;
1001        }
1002
1003        let cache_real_last_key = self.cache_real_last_key().expect("cache real len is not 0");
1004        if self.cache_right_is_sentinel() && *range.end() > cache_real_last_key {
1005            // extend rightward only if there's largest sentinel
1006            let table_sub_range = (
1007                Bound::Excluded(
1008                    self.row_conv
1009                        .state_key_to_table_sub_pk(cache_real_last_key)?,
1010                ),
1011                Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?),
1012            );
1013            tracing::trace!(
1014                partition=?self.deduped_part_key,
1015                table_sub_range=?table_sub_range,
1016                "loading the right half of given range"
1017            );
1018            self.extend_cache_by_range_inner(table, table_sub_range)
1019                .await?;
1020        }
1021
1022        // prefetch rows before the start of the range
1023        self.extend_cache_leftward_by_n(table, range.start())
1024            .await?;
1025
1026        // prefetch rows after the end of the range
1027        self.extend_cache_rightward_by_n(table, range.end()).await
1028    }
1029
1030    async fn extend_cache_leftward_by_n(
1031        &mut self,
1032        table: &StateTable<S>,
1033        hint_key: &StateKey,
1034    ) -> StreamExecutorResult<()> {
1035        if self.cache_real_len() == self.range_cache.len() {
1036            // no sentinel in the cache, meaning we already cached all entries of this partition
1037            return Ok(());
1038        }
1039        assert!(self.range_cache.len() >= 2);
1040
1041        let left_second = {
1042            let mut iter = self.range_cache.inner().iter();
1043            let left_first = iter.next().unwrap().0;
1044            if left_first.is_normal() {
1045                // the leftside already reaches the beginning of this partition in the table
1046                return Ok(());
1047            }
1048            iter.next().unwrap().0
1049        };
1050        let range_to_exclusive = match left_second {
1051            CacheKey::Normal(smallest_in_cache) => smallest_in_cache,
1052            CacheKey::Largest => hint_key, // no normal entry in the cache
1053            _ => unreachable!(),
1054        }
1055        .clone();
1056
1057        self.extend_cache_leftward_by_n_inner(table, &range_to_exclusive)
1058            .await?;
1059
1060        if self.cache_real_len() == 0 {
1061            // Cache was empty, and extending leftward didn't add anything to the cache, but we
1062            // can't just remove the smallest sentinel, we must also try extending rightward.
1063            self.extend_cache_rightward_by_n_inner(table, hint_key)
1064                .await?;
1065            if self.cache_real_len() == 0 {
1066                // still empty, meaning the table is empty
1067                self.range_cache.remove(&CacheKey::Smallest);
1068                self.range_cache.remove(&CacheKey::Largest);
1069            }
1070        }
1071
1072        Ok(())
1073    }
1074
1075    async fn extend_cache_rightward_by_n(
1076        &mut self,
1077        table: &StateTable<S>,
1078        hint_key: &StateKey,
1079    ) -> StreamExecutorResult<()> {
1080        if self.cache_real_len() == self.range_cache.len() {
1081            // no sentinel in the cache, meaning we already cached all entries of this partition
1082            return Ok(());
1083        }
1084        assert!(self.range_cache.len() >= 2);
1085
1086        let right_second = {
1087            let mut iter = self.range_cache.inner().iter();
1088            let right_first = iter.next_back().unwrap().0;
1089            if right_first.is_normal() {
1090                // the rightside already reaches the end of this partition in the table
1091                return Ok(());
1092            }
1093            iter.next_back().unwrap().0
1094        };
1095        let range_from_exclusive = match right_second {
1096            CacheKey::Normal(largest_in_cache) => largest_in_cache,
1097            CacheKey::Smallest => hint_key, // no normal entry in the cache
1098            _ => unreachable!(),
1099        }
1100        .clone();
1101
1102        self.extend_cache_rightward_by_n_inner(table, &range_from_exclusive)
1103            .await?;
1104
1105        if self.cache_real_len() == 0 {
1106            // Cache was empty, and extending rightward didn't add anything to the cache, but we
1107            // can't just remove the smallest sentinel, we must also try extending leftward.
1108            self.extend_cache_leftward_by_n_inner(table, hint_key)
1109                .await?;
1110            if self.cache_real_len() == 0 {
1111                // still empty, meaning the table is empty
1112                self.range_cache.remove(&CacheKey::Smallest);
1113                self.range_cache.remove(&CacheKey::Largest);
1114            }
1115        }
1116
1117        Ok(())
1118    }
1119
1120    async fn extend_cache_by_range_inner(
1121        &mut self,
1122        table: &StateTable<S>,
1123        table_sub_range: (Bound<impl Row>, Bound<impl Row>),
1124    ) -> StreamExecutorResult<()> {
1125        let stream = table
1126            .iter_with_prefix(
1127                self.deduped_part_key,
1128                &table_sub_range,
1129                PrefetchOptions::default(),
1130            )
1131            .await?;
1132
1133        #[for_await]
1134        for row in stream {
1135            let row: OwnedRow = row?.into_owned_row();
1136            let key = self.row_conv.row_to_state_key(&row)?;
1137            self.range_cache.insert(CacheKey::from(key), row);
1138        }
1139
1140        Ok(())
1141    }
1142
1143    async fn extend_cache_leftward_by_n_inner(
1144        &mut self,
1145        table: &StateTable<S>,
1146        range_to_exclusive: &StateKey,
1147    ) -> StreamExecutorResult<()> {
1148        let mut n_extended = 0usize;
1149        {
1150            let sub_range = (
1151                Bound::<OwnedRow>::Unbounded,
1152                Bound::Excluded(
1153                    self.row_conv
1154                        .state_key_to_table_sub_pk(range_to_exclusive)?,
1155                ),
1156            );
1157            let rev_stream = table
1158                .rev_iter_with_prefix(
1159                    self.deduped_part_key,
1160                    &sub_range,
1161                    PrefetchOptions::default(),
1162                )
1163                .await?;
1164
1165            #[for_await]
1166            for row in rev_stream {
1167                let row: OwnedRow = row?.into_owned_row();
1168
1169                let key = self.row_conv.row_to_state_key(&row)?;
1170                self.range_cache.insert(CacheKey::from(key), row);
1171
1172                n_extended += 1;
1173                if n_extended == MAGIC_BATCH_SIZE {
1174                    break;
1175                }
1176            }
1177        }
1178
1179        if n_extended < MAGIC_BATCH_SIZE && self.cache_real_len() > 0 {
1180            // we reached the beginning of this partition in the table
1181            self.range_cache.remove(&CacheKey::Smallest);
1182        }
1183
1184        Ok(())
1185    }
1186
1187    async fn extend_cache_rightward_by_n_inner(
1188        &mut self,
1189        table: &StateTable<S>,
1190        range_from_exclusive: &StateKey,
1191    ) -> StreamExecutorResult<()> {
1192        let mut n_extended = 0usize;
1193        {
1194            let sub_range = (
1195                Bound::Excluded(
1196                    self.row_conv
1197                        .state_key_to_table_sub_pk(range_from_exclusive)?,
1198                ),
1199                Bound::<OwnedRow>::Unbounded,
1200            );
1201            let stream = table
1202                .iter_with_prefix(
1203                    self.deduped_part_key,
1204                    &sub_range,
1205                    PrefetchOptions::default(),
1206                )
1207                .await?;
1208
1209            #[for_await]
1210            for row in stream {
1211                let row: OwnedRow = row?.into_owned_row();
1212
1213                let key = self.row_conv.row_to_state_key(&row)?;
1214                self.range_cache.insert(CacheKey::from(key), row);
1215
1216                n_extended += 1;
1217                if n_extended == MAGIC_BATCH_SIZE {
1218                    break;
1219                }
1220            }
1221        }
1222
1223        if n_extended < MAGIC_BATCH_SIZE && self.cache_real_len() > 0 {
1224            // we reached the end of this partition in the table
1225            self.range_cache.remove(&CacheKey::Largest);
1226        }
1227
1228        Ok(())
1229    }
1230}