risingwave_stream/executor/over_window/
frame_finder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Helper functions for finding affected ranges from over window range cache for a
16//! given set of changes (delta).
17
18use std::ops::Bound;
19
20use delta_btree_map::{CursorWithDelta, DeltaBTreeMap};
21use itertools::Itertools;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{Datum, Sentinelled, ToDatumRef};
24use risingwave_common::util::memcmp_encoding;
25use risingwave_common::util::sort_util::cmp_datum;
26use risingwave_expr::window_function::{FrameBound, RangeFrameBounds, RowsFrameBounds, StateKey};
27
28use super::over_partition::CacheKey;
29
30// -------------------------- ↓ PUBLIC INTERFACE ↓ --------------------------
31
32/// Merge several `ROWS` frames into one super frame. The returned super frame is
33/// guaranteed to be *canonical*, which means that the `CURRENT ROW` is always
34/// included in the returned frame.
35pub(super) fn merge_rows_frames(rows_frames: &[&RowsFrameBounds]) -> RowsFrameBounds {
36    if rows_frames.is_empty() {
37        // When there's no `ROWS` frame, for simplicity, we don't return `None`. Instead,
38        // we return `ROWS BETWEEN CURRENT ROW AND CURRENT ROW`, which is the implicit
39        // frame for all selected upstream columns.
40        //
41        // For example, the following two queries are equivalent:
42        //
43        // ```sql
44        // SELECT a, b, sum(c) OVER (...) FROM t;
45        // SELECT
46        //   first_value(a) OVER (... ROWS BETWEEN CURRENT ROW AND CURRENT ROW),
47        //   first_value(b) OVER (... ROWS BETWEEN CURRENT ROW AND CURRENT ROW),
48        //   sum(c) OVER (...)
49        // FROM t;
50        // ```
51        //
52        // See https://risingwave.com/blog/risingwave-window-functions-the-art-of-sliding-and-the-aesthetics-of-symmetry/
53        // for more details.
54        return RowsFrameBounds {
55            start: FrameBound::CurrentRow,
56            end: FrameBound::CurrentRow,
57        };
58    }
59
60    let none_as_max_cmp = |x: &Option<usize>, y: &Option<usize>| match (x, y) {
61        // None means unbounded, which should be the largest.
62        (None, None) => std::cmp::Ordering::Equal,
63        (None, Some(_)) => std::cmp::Ordering::Greater,
64        (Some(_), None) => std::cmp::Ordering::Less,
65        (Some(x), Some(y)) => x.cmp(y),
66    };
67
68    // Note that the following two both use `max_by`, unlike when handling `RANGE` frames,
69    // because here for `ROWS` frames, offsets are unsigned. We convert all pure preceding/
70    // following `ROWS` frames into one containing the `CURRENT ROW`.
71    let start = rows_frames
72        .iter()
73        .map(|bounds| bounds.n_preceding_rows())
74        .max_by(none_as_max_cmp)
75        .unwrap();
76    let end = rows_frames
77        .iter()
78        .map(|bounds| bounds.n_following_rows())
79        .max_by(none_as_max_cmp)
80        .unwrap();
81
82    RowsFrameBounds {
83        start: start
84            .map(FrameBound::Preceding) // may produce Preceding(0), but doesn't matter
85            .unwrap_or(FrameBound::UnboundedPreceding),
86        end: end
87            .map(FrameBound::Following) // may produce Following(0), but doesn't matter
88            .unwrap_or(FrameBound::UnboundedFollowing),
89    }
90}
91
92/// For a canonical `ROWS` frame, given a key in delta, find the cache key
93/// corresponding to the CURRENT ROW of the first frame that contains the given
94/// key.
95///
96/// ## Example
97///
98/// - Frame: `ROWS BETWEEN ? AND 2 FOLLOWING`
99/// - Cache: `[1, 2, 5]`
100/// - Delta: `[Delete 5]`
101///
102/// For delta key `5`, this function will return `1` as the *first curr key*.
103///
104/// More examples can be found in the comment inside [`find_curr_for_rows_frame`].
105pub(super) fn find_first_curr_for_rows_frame<'cache>(
106    frame_bounds: &RowsFrameBounds,
107    part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
108    delta_key: &'cache CacheKey,
109) -> &'cache CacheKey {
110    find_curr_for_rows_frame::<true /* LEFT */>(frame_bounds, part_with_delta, delta_key)
111}
112
113/// For a canonical `ROWS` frame, given a key in delta, find the cache key
114/// corresponding to the CURRENT ROW of the last frame that contains the given
115/// key.
116///
117/// This is the symmetric function of [`find_first_curr_for_rows_frame`].
118pub(super) fn find_last_curr_for_rows_frame<'cache>(
119    frame_bounds: &RowsFrameBounds,
120    part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
121    delta_key: &'cache CacheKey,
122) -> &'cache CacheKey {
123    find_curr_for_rows_frame::<false /* RIGHT */>(frame_bounds, part_with_delta, delta_key)
124}
125
126/// For a canonical `ROWS` frame, given a key in `part_with_delta` corresponding
127/// to some CURRENT ROW, find the cache key corresponding to the start row in
128/// that frame.
129pub(super) fn find_frame_start_for_rows_frame<'cache>(
130    frame_bounds: &RowsFrameBounds,
131    part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
132    curr_key: &'cache CacheKey,
133) -> &'cache CacheKey {
134    find_boundary_for_rows_frame::<true /* LEFT */>(frame_bounds, part_with_delta, curr_key)
135}
136
137/// For a canonical `ROWS` frame, given a key in `part_with_delta` corresponding
138/// to some CURRENT ROW, find the cache key corresponding to the end row in that
139/// frame.
140///
141/// This is the symmetric function of [`find_frame_start_for_rows_frame`].
142pub(super) fn find_frame_end_for_rows_frame<'cache>(
143    frame_bounds: &RowsFrameBounds,
144    part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
145    curr_key: &'cache CacheKey,
146) -> &'cache CacheKey {
147    find_boundary_for_rows_frame::<false /* RIGHT */>(frame_bounds, part_with_delta, curr_key)
148}
149
150/// Given the first and last key in delta, calculate the order values of the first
151/// and the last frames logically affected by some `RANGE` frames.
152pub(super) fn calc_logical_curr_for_range_frames(
153    range_frames: &[RangeFrameBounds],
154    delta_first_key: &StateKey,
155    delta_last_key: &StateKey,
156) -> Option<(Sentinelled<Datum>, Sentinelled<Datum>)> {
157    calc_logical_ord_for_range_frames(
158        range_frames,
159        delta_first_key,
160        delta_last_key,
161        |bounds, v| bounds.first_curr_of(v),
162        |bounds, v| bounds.last_curr_of(v),
163    )
164}
165
166/// Given the curr keys of the first and the last affected frames, calculate the order
167/// values of the logical start row of the first frame and the logical end row of the
168/// last frame.
169pub(super) fn calc_logical_boundary_for_range_frames(
170    range_frames: &[RangeFrameBounds],
171    first_curr_key: &StateKey,
172    last_curr_key: &StateKey,
173) -> Option<(Sentinelled<Datum>, Sentinelled<Datum>)> {
174    calc_logical_ord_for_range_frames(
175        range_frames,
176        first_curr_key,
177        last_curr_key,
178        |bounds, v| bounds.frame_start_of(v),
179        |bounds, v| bounds.frame_end_of(v),
180    )
181}
182
183/// Given a left logical order value (e.g. first curr order value, first delta order value),
184/// find the most closed cache key in `part_with_delta`. Ideally this function returns
185/// the smallest key that is larger than or equal to the given logical order (using `lower_bound`).
186pub(super) fn find_left_for_range_frames<'cache>(
187    range_frames: &[RangeFrameBounds],
188    part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
189    logical_order_value: impl ToDatumRef,
190    cache_key_pk_len: usize, // this is dirty but we have no better choice
191) -> &'cache CacheKey {
192    find_for_range_frames::<true /* LEFT */>(
193        range_frames,
194        part_with_delta,
195        logical_order_value,
196        cache_key_pk_len,
197    )
198}
199
200/// Given a right logical order value (e.g. last curr order value, last delta order value),
201/// find the most closed cache key in `part_with_delta`. Ideally this function returns
202/// the largest key that is smaller than or equal to the given logical order (using `lower_bound`).
203pub(super) fn find_right_for_range_frames<'cache>(
204    range_frames: &[RangeFrameBounds],
205    part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
206    logical_order_value: impl ToDatumRef,
207    cache_key_pk_len: usize, // this is dirty but we have no better choice
208) -> &'cache CacheKey {
209    find_for_range_frames::<false /* RIGHT */>(
210        range_frames,
211        part_with_delta,
212        logical_order_value,
213        cache_key_pk_len,
214    )
215}
216
217// -------------------------- ↑ PUBLIC INTERFACE ↑ --------------------------
218
219fn find_curr_for_rows_frame<'cache, const LEFT: bool>(
220    frame_bounds: &RowsFrameBounds,
221    part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
222    delta_key: &'cache CacheKey,
223) -> &'cache CacheKey {
224    debug_assert!(frame_bounds.is_canonical());
225    if LEFT {
226        debug_assert!(
227            !frame_bounds.end.is_unbounded_following(),
228            "no need to call this function whenever any frame end is unbounded"
229        );
230    } else {
231        debug_assert!(
232            !frame_bounds.start.is_unbounded_preceding(),
233            "no need to call this function whenever any frame start is unbounded"
234        );
235    }
236    debug_assert!(
237        part_with_delta.first_key().is_some(),
238        "must have something in the range cache after applying delta"
239    );
240
241    // Let's think about the following algorithm with the following cases in mind.
242    // Insertions are relatively easy, so we only give the deletion examples.
243    // The two directions are symmetrical, we only consider the left direction in
244    // the following description.
245    //
246    // ## Background
247    //
248    // *Sm* means smallest sentinel node in the cache
249    // *SM* means largest sentinel node in the cache
250    //
251    // Before calling this function, all entries within range covered by delta should
252    // have been loaded into the cache.
253    //
254    // ## Cases
255    //
256    // Frame: ROWS BETWEEN ? AND 2 FOLLOWING
257    // Delta: Delete 5
258    // Cache + Delta:
259    //   [1, 2, 6] -> 1
260    //   [1, 6] -> 1
261    //   [6, 7, 8] -> 6, not precise but won't do too much harm
262    //   [1, 2] -> 1
263    //   [1] -> 1
264    //   [Sm, 6] -> Sm
265    //   [Sm, 1] -> Sm
266    //   [1, 2, SM] -> 1
267    //   [1, SM] -> 1
268    //   [Sm, 1, SM] -> Sm
269    //   [Sm, SM] -> Sm
270    //
271    // Frame: ROWS BETWEEN ? AND CURRENT ROW
272    // Delta: Delete 5
273    // Cache + Delta:
274    //   [1, 2, 6] -> 6, not precise but won't do too much harm
275    //   [1, 2] -> 2, not precise but won't do too much harm
276    //   [1, 2, SM] -> SM, not precise but won't do too much harm
277    //   [Sm, SM] -> SM, not precise but won't do too much harm
278    //
279    // Frame: ROWS BETWEEN ? AND 2 PRECEDING
280    // This will be treated as if it's `ROWS BETWEEN ? AND CURRENT ROW`.
281
282    let mut cursor = if LEFT {
283        part_with_delta.lower_bound(Bound::Included(delta_key))
284    } else {
285        part_with_delta.upper_bound(Bound::Included(delta_key))
286    };
287    let pointed_key = |cursor: CursorWithDelta<'cache, CacheKey, OwnedRow>| {
288        if LEFT {
289            cursor.peek_next().map(|(k, _)| k)
290        } else {
291            cursor.peek_prev().map(|(k, _)| k)
292        }
293    };
294
295    let n_rows_to_move = if LEFT {
296        frame_bounds.n_following_rows().unwrap()
297    } else {
298        frame_bounds.n_preceding_rows().unwrap()
299    };
300
301    if n_rows_to_move == 0 {
302        return pointed_key(cursor)
303            .or_else(|| {
304                if LEFT {
305                    part_with_delta.last_key()
306                } else {
307                    part_with_delta.first_key()
308                }
309            })
310            .unwrap();
311    }
312
313    for _ in 0..n_rows_to_move {
314        let res = if LEFT { cursor.prev() } else { cursor.next() };
315        if res.is_none() {
316            // we reach the end
317            break;
318        }
319    }
320
321    // We always have a valid key here, because `part_with_delta` must not be empty,
322    // and `n_rows_to_move` is always larger than 0 when we reach here.
323    pointed_key(cursor).unwrap()
324}
325
326fn find_boundary_for_rows_frame<'cache, const LEFT: bool>(
327    frame_bounds: &RowsFrameBounds,
328    part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
329    curr_key: &'cache CacheKey,
330) -> &'cache CacheKey {
331    debug_assert!(frame_bounds.is_canonical());
332    if LEFT {
333        debug_assert!(
334            !frame_bounds.start.is_unbounded_preceding(),
335            "no need to call this function whenever any frame start is unbounded"
336        );
337    } else {
338        debug_assert!(
339            !frame_bounds.end.is_unbounded_following(),
340            "no need to call this function whenever any frame end is unbounded"
341        );
342    }
343
344    // Now things are easier than in `find_curr_for_rows_frame`, because we already
345    // have `curr_key` which definitely exists in the `part_with_delta`. We just find
346    // the cursor pointing to it and move the cursor to frame boundary.
347
348    let mut cursor = if LEFT {
349        part_with_delta.before(curr_key).unwrap()
350    } else {
351        part_with_delta.after(curr_key).unwrap()
352    };
353    let pointed_key = |cursor: CursorWithDelta<'cache, CacheKey, OwnedRow>| {
354        if LEFT {
355            cursor.peek_next().map(|(k, _)| k)
356        } else {
357            cursor.peek_prev().map(|(k, _)| k)
358        }
359    };
360
361    let n_rows_to_move = if LEFT {
362        frame_bounds.n_preceding_rows().unwrap()
363    } else {
364        frame_bounds.n_following_rows().unwrap()
365    };
366
367    for _ in 0..n_rows_to_move {
368        let res = if LEFT { cursor.prev() } else { cursor.next() };
369        if res.is_none() {
370            // we reach the end
371            break;
372        }
373    }
374
375    // We always have a valid key here, because `cursor` must point to a valid key
376    // at the beginning.
377    pointed_key(cursor).unwrap()
378}
379
380/// Given a pair of left and right state keys, calculate the leftmost (smallest) and rightmost
381/// (largest) order values after the two given `offset_fn`s are applied, for all range frames.
382///
383/// A more vivid description may be: Given a pair of left and right keys, this function pushes
384/// the keys leftward and rightward respectively according to the given `offset_fn`s.
385///
386/// This is not very understandable but we have to extract the code to a function to avoid
387/// repeating. Check [`calc_logical_curr_for_range_frames`] and [`calc_logical_boundary_for_range_frames`]
388/// if you cannot understand the purpose of this function.
389fn calc_logical_ord_for_range_frames(
390    range_frames: &[RangeFrameBounds],
391    left_key: &StateKey,
392    right_key: &StateKey,
393    left_offset_fn: impl Fn(&RangeFrameBounds, &Datum) -> Sentinelled<Datum>,
394    right_offset_fn: impl Fn(&RangeFrameBounds, &Datum) -> Sentinelled<Datum>,
395) -> Option<(Sentinelled<Datum>, Sentinelled<Datum>)> {
396    if range_frames.is_empty() {
397        return None;
398    }
399
400    let (data_type, order_type) = range_frames
401        .iter()
402        .map(|bounds| (&bounds.order_data_type, bounds.order_type))
403        .all_equal_value()
404        .unwrap();
405
406    let datum_cmp = |a: &Datum, b: &Datum| cmp_datum(a, b, order_type);
407
408    let left_given_ord = memcmp_encoding::decode_value(data_type, &left_key.order_key, order_type)
409        .expect("no reason to fail because we just encoded it in memory");
410    let right_given_ord =
411        memcmp_encoding::decode_value(data_type, &right_key.order_key, order_type)
412            .expect("no reason to fail because we just encoded it in memory");
413
414    let logical_left_offset_ord = {
415        let mut order_value = None;
416        for bounds in range_frames {
417            let new_order_value = left_offset_fn(bounds, &left_given_ord);
418            order_value = match (order_value, new_order_value) {
419                (None, any_new) => Some(any_new),
420                (Some(old), new) => Some(std::cmp::min_by(old, new, |x, y| x.cmp_by(y, datum_cmp))),
421            };
422            if !order_value.as_ref().unwrap().is_normal() {
423                // already unbounded
424                assert!(
425                    order_value.as_ref().unwrap().is_smallest(),
426                    "left order value should never be `Largest`"
427                );
428                break;
429            }
430        }
431        order_value.expect("# of range frames > 0")
432    };
433
434    let logical_right_offset_ord = {
435        let mut order_value = None;
436        for bounds in range_frames {
437            let new_order_value = right_offset_fn(bounds, &right_given_ord);
438            order_value = match (order_value, new_order_value) {
439                (None, any_new) => Some(any_new),
440                (Some(old), new) => Some(std::cmp::max_by(old, new, |x, y| x.cmp_by(y, datum_cmp))),
441            };
442            if !order_value.as_ref().unwrap().is_normal() {
443                // already unbounded
444                assert!(
445                    order_value.as_ref().unwrap().is_largest(),
446                    "right order value should never be `Smallest`"
447                );
448                break;
449            }
450        }
451        order_value.expect("# of range frames > 0")
452    };
453
454    Some((logical_left_offset_ord, logical_right_offset_ord))
455}
456
457fn find_for_range_frames<'cache, const LEFT: bool>(
458    range_frames: &[RangeFrameBounds],
459    part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
460    logical_order_value: impl ToDatumRef,
461    cache_key_pk_len: usize,
462) -> &'cache CacheKey {
463    debug_assert!(
464        part_with_delta.first_key().is_some(),
465        "must have something in the range cache after applying delta"
466    );
467
468    let order_type = range_frames
469        .iter()
470        .map(|bounds| bounds.order_type)
471        .all_equal_value()
472        .unwrap();
473
474    let search_key = Sentinelled::Normal(StateKey {
475        order_key: memcmp_encoding::encode_value(logical_order_value, order_type)
476            .expect("the data type is simple, should succeed"),
477        pk: if LEFT {
478            OwnedRow::empty() // empty row is minimal
479        } else {
480            OwnedRow::new(vec![None; cache_key_pk_len]) // all-NULL row is maximal in default order
481        }
482        .into(),
483    });
484
485    if LEFT {
486        let cursor = part_with_delta.lower_bound(Bound::Included(&search_key));
487        if let Some((prev_key, _)) = cursor.peek_prev()
488            && prev_key.is_smallest()
489        {
490            // If the found lower bound of search key is right behind a smallest sentinel,
491            // we don't know if there's any other rows with the same order key in the state
492            // table but not in cache. We should conservatively return the sentinel key as
493            // the curr key.
494            prev_key
495        } else {
496            // If there's nothing on the left, it simply means that the search key is larger
497            // than any existing key. Returning the last key in this case does no harm. Especially,
498            // if the last key is largest sentinel, the caller should extend the cache rightward
499            // to get possible entries with the same order value into the cache.
500            cursor
501                .peek_next()
502                .map(|(k, _)| k)
503                .or_else(|| part_with_delta.last_key())
504                .unwrap()
505        }
506    } else {
507        let cursor = part_with_delta.upper_bound(Bound::Included(&search_key));
508        if let Some((next_key, _)) = cursor.peek_next()
509            && next_key.is_largest()
510        {
511            next_key
512        } else {
513            cursor
514                .peek_prev()
515                .map(|(k, _)| k)
516                .or_else(|| part_with_delta.first_key())
517                .unwrap()
518        }
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use std::collections::BTreeMap;
525
526    use delta_btree_map::Change;
527    use risingwave_common::types::{ScalarImpl, Sentinelled};
528    use risingwave_expr::window_function::FrameBound::*;
529    use risingwave_expr::window_function::{RowsFrameBounds, StateKey};
530
531    use super::*;
532
533    #[test]
534    fn test_merge_rows_frame() {
535        fn assert_equivalent(bounds1: RowsFrameBounds, bounds2: RowsFrameBounds) {
536            assert_eq!(bounds1.start.to_offset(), bounds2.start.to_offset());
537            assert_eq!(bounds1.end.to_offset(), bounds2.end.to_offset());
538        }
539
540        assert_equivalent(
541            merge_rows_frames(&[]),
542            RowsFrameBounds {
543                start: CurrentRow,
544                end: CurrentRow,
545            },
546        );
547
548        let frames = [
549            &RowsFrameBounds {
550                start: Preceding(3),
551                end: Preceding(2),
552            },
553            &RowsFrameBounds {
554                start: Preceding(1),
555                end: Preceding(4),
556            },
557        ];
558        assert_equivalent(
559            merge_rows_frames(&frames),
560            RowsFrameBounds {
561                start: Preceding(4),
562                end: CurrentRow,
563            },
564        );
565
566        let frames = [
567            &RowsFrameBounds {
568                start: Preceding(3),
569                end: Following(2),
570            },
571            &RowsFrameBounds {
572                start: Preceding(2),
573                end: Following(3),
574            },
575        ];
576        assert_equivalent(
577            merge_rows_frames(&frames),
578            RowsFrameBounds {
579                start: Preceding(3),
580                end: Following(3),
581            },
582        );
583
584        let frames = [
585            &RowsFrameBounds {
586                start: UnboundedPreceding,
587                end: Following(2),
588            },
589            &RowsFrameBounds {
590                start: Preceding(2),
591                end: UnboundedFollowing,
592            },
593        ];
594        assert_equivalent(
595            merge_rows_frames(&frames),
596            RowsFrameBounds {
597                start: UnboundedPreceding,
598                end: UnboundedFollowing,
599            },
600        );
601
602        let frames = [
603            &RowsFrameBounds {
604                start: UnboundedPreceding,
605                end: Following(2),
606            },
607            &RowsFrameBounds {
608                start: Following(5),
609                end: Following(2),
610            },
611        ];
612        assert_equivalent(
613            merge_rows_frames(&frames),
614            RowsFrameBounds {
615                start: UnboundedPreceding,
616                end: Following(5),
617            },
618        );
619    }
620
621    macro_rules! create_cache {
622        (..., $( $pk:literal ),* , ...) => {
623            {
624                let mut cache = create_cache!( $( $pk ),* );
625                cache.insert(CacheKey::Smallest, OwnedRow::empty().into());
626                cache.insert(CacheKey::Largest, OwnedRow::empty().into());
627                cache
628            }
629        };
630        (..., $( $pk:literal ),*) => {
631            {
632                let mut cache = create_cache!( $( $pk ),* );
633                cache.insert(CacheKey::Smallest, OwnedRow::empty().into());
634                cache
635            }
636        };
637        ($( $pk:literal ),* , ...) => {
638            {
639                let mut cache = create_cache!( $( $pk ),* );
640                cache.insert(CacheKey::Largest, OwnedRow::empty().into());
641                cache
642            }
643        };
644        ($( $pk:literal ),*) => {
645            {
646                #[allow(unused_mut)]
647                let mut cache = BTreeMap::new();
648                $(
649                    cache.insert(
650                        CacheKey::Normal(
651                            StateKey {
652                                // order key doesn't matter here
653                                order_key: vec![].into(),
654                                pk: OwnedRow::new(vec![Some(ScalarImpl::from($pk))]).into(),
655                            },
656                        ),
657                        // value row doesn't matter here
658                        OwnedRow::empty(),
659                    );
660                )*
661                cache
662            }
663        };
664        ($ord_type:expr, [..., $( ( $ord:literal, $pk:literal ) ),* , ...]) => {
665            {
666                let mut cache = create_cache!($ord_type, [$( ( $ord, $pk ) ),*]);
667                cache.insert(CacheKey::Smallest, OwnedRow::empty().into());
668                cache.insert(CacheKey::Largest, OwnedRow::empty().into());
669                cache
670            }
671        };
672        ($ord_type:expr, [..., $( ( $ord:literal, $pk:literal ) ),*]) => {
673            {
674                let mut cache = create_cache!($ord_type, [$( ( $ord, $pk ) ),*]);
675                cache.insert(CacheKey::Smallest, OwnedRow::empty().into());
676                cache
677            }
678        };
679        ($ord_type:expr, [$( ( $ord:literal, $pk:literal ) ),* , ...]) => {
680            {
681                let mut cache = create_cache!($ord_type, [$( ( $ord, $pk ) ),*]);
682                cache.insert(CacheKey::Largest, OwnedRow::empty().into());
683                cache
684            }
685        };
686        ($ord_type:expr, [$( ( $ord:literal, $pk:literal ) ),*]) => {
687            {
688                #[allow(unused_mut)]
689                let mut cache = BTreeMap::new();
690                $(
691                    cache.insert(
692                        CacheKey::Normal(
693                            StateKey {
694                                order_key: memcmp_encoding::encode_value(
695                                    Some(ScalarImpl::from($ord)),
696                                    $ord_type,
697                                ).unwrap(),
698                                pk: OwnedRow::new(vec![Some(ScalarImpl::from($pk))]).into(),
699                            },
700                        ),
701                        // value row doesn't matter here
702                        OwnedRow::empty(),
703                    );
704                )*
705                cache
706            }
707        }
708    }
709
710    macro_rules! create_change {
711        (Delete) => {
712            Change::Delete
713        };
714        (Insert) => {
715            Change::Insert(OwnedRow::empty())
716        };
717    }
718
719    macro_rules! create_delta {
720        ($( ( $pk:literal, $change:ident ) ),+ $(,)?) => {
721            {
722                let mut delta = BTreeMap::new();
723                $(
724                    delta.insert(
725                        CacheKey::Normal(
726                            StateKey {
727                                // order key doesn't matter here
728                                order_key: vec![].into(),
729                                pk: OwnedRow::new(vec![Some(ScalarImpl::from($pk))]).into(),
730                            },
731                        ),
732                        // value row doesn't matter here
733                        create_change!( $change ),
734                    );
735                )*
736                delta
737            }
738        };
739        ($ord_type:expr, [ $( ( $ord:literal, $pk:literal, $change:ident ) ),+ $(,)? ]) => {
740            {
741                let mut delta = BTreeMap::new();
742                $(
743                    delta.insert(
744                        CacheKey::Normal(
745                            StateKey {
746                                order_key: memcmp_encoding::encode_value(
747                                    Some(ScalarImpl::from($ord)),
748                                    $ord_type,
749                                ).unwrap(),
750                                pk: OwnedRow::new(vec![Some(ScalarImpl::from($pk))]).into(),
751                            },
752                        ),
753                        // value row doesn't matter here
754                        create_change!( $change ),
755                    );
756                )*
757                delta
758            }
759        };
760    }
761
762    mod rows_frame_tests {
763        use super::*;
764
765        fn assert_cache_key_eq(given: &CacheKey, expected: impl Into<ScalarImpl>) {
766            assert_eq!(
767                given.as_normal_expect().pk.0,
768                OwnedRow::new(vec![Some(expected.into())])
769            )
770        }
771
772        #[test]
773        fn test_insert_delta_only() {
774            let cache = create_cache!();
775            let delta = create_delta!((1, Insert), (2, Insert), (3, Insert));
776            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
777            let delta_first_key = delta.first_key_value().unwrap().0;
778            let delta_last_key = delta.last_key_value().unwrap().0;
779
780            let bounds = RowsFrameBounds {
781                start: Preceding(2),
782                end: CurrentRow,
783            };
784
785            let first_curr_key =
786                find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
787            let last_curr_key =
788                find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
789            assert_cache_key_eq(first_curr_key, 1);
790            assert_cache_key_eq(last_curr_key, 3);
791
792            let first_frame_start =
793                find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
794            let last_frame_end =
795                find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
796            assert_cache_key_eq(first_frame_start, 1);
797            assert_cache_key_eq(last_frame_end, 3);
798        }
799
800        #[test]
801        fn test_simple() {
802            let cache = create_cache!(1, 2, 3, 4, 5, 6);
803            let delta = create_delta!((2, Insert), (3, Delete));
804            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
805            let delta_first_key = delta.first_key_value().unwrap().0;
806            let delta_last_key = delta.last_key_value().unwrap().0;
807
808            {
809                let bounds = RowsFrameBounds {
810                    start: Preceding(2),
811                    end: CurrentRow,
812                };
813                let first_curr_key =
814                    find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
815                let last_curr_key =
816                    find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
817                assert_cache_key_eq(first_curr_key, 2);
818                assert_cache_key_eq(last_curr_key, 5);
819
820                let first_frame_start =
821                    find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
822                let last_frame_end =
823                    find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
824                assert_cache_key_eq(first_frame_start, 1);
825                assert_cache_key_eq(last_frame_end, 5);
826            }
827
828            {
829                let bounds = RowsFrameBounds {
830                    start: Preceding(1),
831                    end: Following(2),
832                };
833                let first_curr_key =
834                    find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
835                let last_curr_key =
836                    find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
837                assert_cache_key_eq(first_curr_key, 1);
838                assert_cache_key_eq(last_curr_key, 4);
839
840                let first_frame_start =
841                    find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
842                let last_frame_end =
843                    find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
844                assert_cache_key_eq(first_frame_start, 1);
845                assert_cache_key_eq(last_frame_end, 6);
846            }
847
848            {
849                let bounds = RowsFrameBounds {
850                    start: CurrentRow,
851                    end: Following(2),
852                };
853                let first_curr_key =
854                    find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
855                let last_curr_key =
856                    find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
857                assert_cache_key_eq(first_curr_key, 1);
858                assert_cache_key_eq(last_curr_key, 2);
859
860                let first_frame_start =
861                    find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
862                let last_frame_end =
863                    find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
864                assert_cache_key_eq(first_frame_start, 1);
865                assert_cache_key_eq(last_frame_end, 5);
866            }
867        }
868
869        #[test]
870        fn test_lag_corner_case() {
871            let cache = create_cache!(1, 2, 3, 4, 5, 6);
872            let delta = create_delta!((1, Delete), (2, Delete), (3, Delete));
873            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
874            let delta_first_key = delta.first_key_value().unwrap().0;
875            let delta_last_key = delta.last_key_value().unwrap().0;
876
877            let bounds = RowsFrameBounds {
878                start: Preceding(1),
879                end: CurrentRow,
880            };
881
882            let first_curr_key =
883                find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
884            let last_curr_key =
885                find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
886            assert_cache_key_eq(first_curr_key, 4);
887            assert_cache_key_eq(last_curr_key, 4);
888
889            let first_frame_start =
890                find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
891            let last_frame_end =
892                find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
893            assert_cache_key_eq(first_frame_start, 4);
894            assert_cache_key_eq(last_frame_end, 4);
895        }
896
897        #[test]
898        fn test_lead_corner_case() {
899            let cache = create_cache!(1, 2, 3, 4, 5, 6);
900            let delta = create_delta!((4, Delete), (5, Delete), (6, Delete));
901            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
902            let delta_first_key = delta.first_key_value().unwrap().0;
903            let delta_last_key = delta.last_key_value().unwrap().0;
904
905            let bounds = RowsFrameBounds {
906                start: CurrentRow,
907                end: Following(1),
908            };
909
910            let first_curr_key =
911                find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
912            let last_curr_key =
913                find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
914            assert_cache_key_eq(first_curr_key, 3);
915            assert_cache_key_eq(last_curr_key, 3);
916
917            let first_frame_start =
918                find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
919            let last_frame_end =
920                find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
921            assert_cache_key_eq(first_frame_start, 3);
922            assert_cache_key_eq(last_frame_end, 3);
923        }
924
925        #[test]
926        fn test_lag_lead_offset_0_corner_case_1() {
927            let cache = create_cache!(1, 2, 3, 4);
928            let delta = create_delta!((2, Delete), (3, Delete));
929            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
930            let delta_first_key = delta.first_key_value().unwrap().0;
931            let delta_last_key = delta.last_key_value().unwrap().0;
932
933            let bounds = RowsFrameBounds {
934                start: CurrentRow,
935                end: CurrentRow,
936            };
937
938            let first_curr_key =
939                find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
940            let last_curr_key =
941                find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
942            assert_cache_key_eq(first_curr_key, 4);
943            assert_cache_key_eq(last_curr_key, 1);
944
945            // first_curr_key > last_curr_key, should not continue to find frame start/end
946        }
947
948        #[test]
949        fn test_lag_lead_offset_0_corner_case_2() {
950            // Note this case is false-positive, but it does very little harm.
951
952            let cache = create_cache!(1, 2, 3, 4);
953            let delta = create_delta!((2, Delete), (3, Delete), (4, Delete));
954            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
955            let delta_first_key = delta.first_key_value().unwrap().0;
956            let delta_last_key = delta.last_key_value().unwrap().0;
957
958            let bounds = RowsFrameBounds {
959                start: CurrentRow,
960                end: CurrentRow,
961            };
962
963            let first_curr_key =
964                find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
965            let last_curr_key =
966                find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
967            assert_cache_key_eq(first_curr_key, 1);
968            assert_cache_key_eq(last_curr_key, 1);
969
970            let first_frame_start =
971                find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
972            let last_frame_end =
973                find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
974            assert_cache_key_eq(first_frame_start, 1);
975            assert_cache_key_eq(last_frame_end, 1);
976        }
977
978        #[test]
979        fn test_lag_lead_offset_0_corner_case_3() {
980            let cache = create_cache!(1, 2, 3, 4, 5);
981            let delta = create_delta!((2, Delete), (3, Insert), (4, Delete));
982            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
983            let delta_first_key = delta.first_key_value().unwrap().0;
984            let delta_last_key = delta.last_key_value().unwrap().0;
985
986            let bounds = RowsFrameBounds {
987                start: CurrentRow,
988                end: CurrentRow,
989            };
990
991            let first_curr_key =
992                find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
993            let last_curr_key =
994                find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
995            assert_cache_key_eq(first_curr_key, 3);
996            assert_cache_key_eq(last_curr_key, 3);
997
998            let first_frame_start =
999                find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1000            let last_frame_end =
1001                find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1002            assert_cache_key_eq(first_frame_start, 3);
1003            assert_cache_key_eq(last_frame_end, 3);
1004        }
1005
1006        #[test]
1007        fn test_empty_with_sentinels() {
1008            let cache: BTreeMap<Sentinelled<StateKey>, OwnedRow> = create_cache!(..., , ...);
1009            let delta = create_delta!((1, Insert), (2, Insert));
1010            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1011            let delta_first_key = delta.first_key_value().unwrap().0;
1012            let delta_last_key = delta.last_key_value().unwrap().0;
1013
1014            {
1015                let bounds = RowsFrameBounds {
1016                    start: CurrentRow,
1017                    end: CurrentRow,
1018                };
1019                let first_curr_key =
1020                    find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1021                let last_curr_key =
1022                    find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1023                assert_cache_key_eq(first_curr_key, 1);
1024                assert_cache_key_eq(last_curr_key, 2);
1025
1026                let first_frame_start =
1027                    find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1028                let last_frame_end =
1029                    find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1030                assert_cache_key_eq(first_frame_start, 1);
1031                assert_cache_key_eq(last_frame_end, 2);
1032            }
1033
1034            {
1035                let bounds = RowsFrameBounds {
1036                    start: Preceding(1),
1037                    end: CurrentRow,
1038                };
1039                let first_curr_key =
1040                    find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1041                let last_curr_key =
1042                    find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1043                assert_cache_key_eq(first_curr_key, 1);
1044                assert!(last_curr_key.is_largest());
1045
1046                // reaches sentinel, should not continue to find frame start/end
1047            }
1048
1049            {
1050                let bounds = RowsFrameBounds {
1051                    start: CurrentRow,
1052                    end: Following(3),
1053                };
1054
1055                let first_curr_key =
1056                    find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1057                let last_curr_key =
1058                    find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1059                assert!(first_curr_key.is_smallest());
1060                assert_cache_key_eq(last_curr_key, 2);
1061
1062                // reaches sentinel, should not continue to find frame start/end
1063            }
1064        }
1065
1066        #[test]
1067        fn test_with_left_sentinel() {
1068            let cache = create_cache!(..., 2, 4, 5, 8);
1069            let delta = create_delta!((3, Insert), (4, Insert), (8, Delete));
1070            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1071            let delta_first_key = delta.first_key_value().unwrap().0;
1072            let delta_last_key = delta.last_key_value().unwrap().0;
1073
1074            {
1075                let bounds = RowsFrameBounds {
1076                    start: CurrentRow,
1077                    end: Following(1),
1078                };
1079                let first_curr_key =
1080                    find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1081                let last_curr_key =
1082                    find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1083                assert_cache_key_eq(first_curr_key, 2);
1084                assert_cache_key_eq(last_curr_key, 5);
1085
1086                let first_frame_start =
1087                    find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1088                let last_frame_end =
1089                    find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1090                assert_cache_key_eq(first_frame_start, 2);
1091                assert_cache_key_eq(last_frame_end, 5);
1092            }
1093
1094            {
1095                let bounds = RowsFrameBounds {
1096                    start: Preceding(1),
1097                    end: Following(1),
1098                };
1099                let first_curr_key =
1100                    find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1101                let last_curr_key =
1102                    find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1103                assert_cache_key_eq(first_curr_key, 2);
1104                assert_cache_key_eq(last_curr_key, 5);
1105
1106                let first_frame_start =
1107                    find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1108                let last_frame_end =
1109                    find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1110                assert!(first_frame_start.is_smallest());
1111                assert_cache_key_eq(last_frame_end, 5);
1112            }
1113        }
1114
1115        #[test]
1116        fn test_with_right_sentinel() {
1117            let cache = create_cache!(1, 2, 4, 5, 8, ...);
1118            let delta = create_delta!((3, Insert), (4, Insert), (5, Delete));
1119            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1120            let delta_first_key = delta.first_key_value().unwrap().0;
1121            let delta_last_key = delta.last_key_value().unwrap().0;
1122
1123            {
1124                let bounds = RowsFrameBounds {
1125                    start: Preceding(1),
1126                    end: CurrentRow,
1127                };
1128                let first_curr_key =
1129                    find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1130                let last_curr_key =
1131                    find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1132                assert_cache_key_eq(first_curr_key, 3);
1133                assert_cache_key_eq(last_curr_key, 8);
1134
1135                let first_frame_start =
1136                    find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1137                let last_frame_end =
1138                    find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1139                assert_cache_key_eq(first_frame_start, 2);
1140                assert_cache_key_eq(last_frame_end, 8);
1141            }
1142
1143            {
1144                let bounds = RowsFrameBounds {
1145                    start: Preceding(1),
1146                    end: Following(1),
1147                };
1148                let first_curr_key =
1149                    find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1150                let last_curr_key =
1151                    find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1152                assert_cache_key_eq(first_curr_key, 2);
1153                assert_cache_key_eq(last_curr_key, 8);
1154
1155                let first_frame_start =
1156                    find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1157                let last_frame_end =
1158                    find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1159                assert_cache_key_eq(first_frame_start, 1);
1160                assert!(last_frame_end.is_largest());
1161            }
1162        }
1163    }
1164
1165    mod range_frame_tests {
1166        use risingwave_common::types::{DataType, Interval, data_types};
1167        use risingwave_common::util::sort_util::OrderType;
1168        use risingwave_expr::window_function::RangeFrameOffset;
1169
1170        use super::*;
1171
1172        fn create_range_frame<T>(
1173            order_data_type: DataType,
1174            order_type: OrderType,
1175            start: FrameBound<T>,
1176            end: FrameBound<T>,
1177        ) -> RangeFrameBounds
1178        where
1179            T: Into<ScalarImpl>,
1180        {
1181            let offset_data_type = match &order_data_type {
1182                t @ data_types::range_frame_numeric!() => t.clone(),
1183                data_types::range_frame_datetime!() => DataType::Interval,
1184                _ => unreachable!(),
1185            };
1186
1187            let map_fn = |x: T| {
1188                RangeFrameOffset::new_for_test(x.into(), &order_data_type, &offset_data_type)
1189            };
1190            let start = start.map(map_fn);
1191            let end = end.map(map_fn);
1192
1193            RangeFrameBounds {
1194                order_data_type,
1195                order_type,
1196                offset_data_type,
1197                start,
1198                end,
1199            }
1200        }
1201
1202        #[test]
1203        fn test_calc_logical_for_int64_asc() {
1204            let order_data_type = DataType::Int64;
1205            let order_type = OrderType::ascending();
1206
1207            let range_frames = [
1208                create_range_frame(
1209                    order_data_type.clone(),
1210                    order_type,
1211                    Preceding(3i64),
1212                    Preceding(2i64),
1213                ),
1214                create_range_frame(
1215                    order_data_type.clone(),
1216                    order_type,
1217                    Preceding(1i64),
1218                    Following(2i64),
1219                ),
1220            ];
1221
1222            let ord_key_1 = StateKey {
1223                order_key: memcmp_encoding::encode_value(Some(ScalarImpl::Int64(1)), order_type)
1224                    .unwrap(),
1225                pk: OwnedRow::empty().into(),
1226            };
1227            let ord_key_2 = StateKey {
1228                order_key: memcmp_encoding::encode_value(Some(ScalarImpl::Int64(3)), order_type)
1229                    .unwrap(),
1230                pk: OwnedRow::empty().into(),
1231            };
1232
1233            let (logical_first_curr, logical_last_curr) =
1234                calc_logical_curr_for_range_frames(&range_frames, &ord_key_1, &ord_key_2).unwrap();
1235            assert_eq!(
1236                logical_first_curr.as_normal_expect(),
1237                &Some(ScalarImpl::Int64(-1))
1238            );
1239            assert_eq!(
1240                logical_last_curr.as_normal_expect(),
1241                &Some(ScalarImpl::Int64(6))
1242            );
1243
1244            let (first_start, last_end) =
1245                calc_logical_boundary_for_range_frames(&range_frames, &ord_key_1, &ord_key_2)
1246                    .unwrap();
1247            assert_eq!(first_start.as_normal_expect(), &Some(ScalarImpl::Int64(-2)));
1248            assert_eq!(last_end.as_normal_expect(), &Some(ScalarImpl::Int64(5)));
1249        }
1250
1251        #[test]
1252        fn test_calc_logical_for_timestamp_desc_nulls_first() {
1253            let order_data_type = DataType::Timestamp;
1254            let order_type = OrderType::descending_nulls_first();
1255
1256            let range_frames = [create_range_frame(
1257                order_data_type.clone(),
1258                order_type,
1259                Preceding(Interval::from_month_day_usec(1, 2, 3 * 1000 * 1000)),
1260                Following(Interval::from_month_day_usec(0, 1, 0)),
1261            )];
1262
1263            let ord_key_1 = StateKey {
1264                order_key: memcmp_encoding::encode_value(
1265                    Some(ScalarImpl::Timestamp(
1266                        "2024-01-28 00:30:00".parse().unwrap(),
1267                    )),
1268                    order_type,
1269                )
1270                .unwrap(),
1271                pk: OwnedRow::empty().into(),
1272            };
1273            let ord_key_2 = StateKey {
1274                order_key: memcmp_encoding::encode_value(
1275                    Some(ScalarImpl::Timestamp(
1276                        "2024-01-26 15:47:00".parse().unwrap(),
1277                    )),
1278                    order_type,
1279                )
1280                .unwrap(),
1281                pk: OwnedRow::empty().into(),
1282            };
1283
1284            let (logical_first_curr, logical_last_curr) =
1285                calc_logical_curr_for_range_frames(&range_frames, &ord_key_1, &ord_key_2).unwrap();
1286            assert_eq!(
1287                logical_first_curr.as_normal_expect(),
1288                &Some(ScalarImpl::Timestamp(
1289                    "2024-01-29 00:30:00".parse().unwrap()
1290                ))
1291            );
1292            assert_eq!(
1293                logical_last_curr.as_normal_expect(),
1294                &Some(ScalarImpl::Timestamp(
1295                    "2023-12-24 15:46:57".parse().unwrap()
1296                ))
1297            );
1298
1299            let (first_start, last_end) =
1300                calc_logical_boundary_for_range_frames(&range_frames, &ord_key_1, &ord_key_2)
1301                    .unwrap();
1302            assert_eq!(
1303                first_start.as_normal_expect(),
1304                &Some(ScalarImpl::Timestamp(
1305                    "2024-03-01 00:30:03".parse().unwrap()
1306                ))
1307            );
1308            assert_eq!(
1309                last_end.as_normal_expect(),
1310                &Some(ScalarImpl::Timestamp(
1311                    "2024-01-25 15:47:00".parse().unwrap()
1312                ))
1313            );
1314        }
1315
1316        fn assert_find_left_right_result_eq(
1317            order_data_type: DataType,
1318            order_type: OrderType,
1319            part_with_delta: DeltaBTreeMap<'_, CacheKey, OwnedRow>,
1320            logical_order_value: ScalarImpl,
1321            expected_left: Sentinelled<ScalarImpl>,
1322            expected_right: Sentinelled<ScalarImpl>,
1323        ) {
1324            let range_frames = if matches!(order_data_type, DataType::Int32) {
1325                [create_range_frame(
1326                    order_data_type.clone(),
1327                    order_type,
1328                    Preceding(0), // this doesn't matter here
1329                    Following(0), // this doesn't matter here
1330                )]
1331            } else {
1332                panic!()
1333            };
1334            let logical_order_value = Some(logical_order_value);
1335            let cache_key_pk_len = 1;
1336
1337            let find_left_res = find_left_for_range_frames(
1338                &range_frames,
1339                part_with_delta,
1340                &logical_order_value,
1341                cache_key_pk_len,
1342            )
1343            .clone();
1344            assert_eq!(
1345                find_left_res.map(|x| x.pk.0.into_iter().next().unwrap().unwrap()),
1346                expected_left
1347            );
1348
1349            let find_right_res = find_right_for_range_frames(
1350                &range_frames,
1351                part_with_delta,
1352                &logical_order_value,
1353                cache_key_pk_len,
1354            )
1355            .clone();
1356            assert_eq!(
1357                find_right_res.map(|x| x.pk.0.into_iter().next().unwrap().unwrap()),
1358                expected_right
1359            );
1360        }
1361
1362        #[test]
1363        fn test_insert_delta_only() {
1364            let order_data_type = DataType::Int32;
1365            let order_type = OrderType::ascending();
1366
1367            let cache = create_cache!();
1368            let delta = create_delta!(
1369                order_type,
1370                [
1371                    (1, 1, Insert),
1372                    (1, 11, Insert),
1373                    (3, 3, Insert),
1374                    (5, 5, Insert)
1375                ]
1376            );
1377            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1378
1379            assert_find_left_right_result_eq(
1380                order_data_type.clone(),
1381                order_type,
1382                part_with_delta,
1383                ScalarImpl::from(2),
1384                ScalarImpl::from(3).into(),
1385                ScalarImpl::from(11).into(),
1386            );
1387
1388            assert_find_left_right_result_eq(
1389                order_data_type.clone(),
1390                order_type,
1391                part_with_delta,
1392                ScalarImpl::from(5),
1393                ScalarImpl::from(5).into(),
1394                ScalarImpl::from(5).into(),
1395            );
1396
1397            assert_find_left_right_result_eq(
1398                order_data_type.clone(),
1399                order_type,
1400                part_with_delta,
1401                ScalarImpl::from(6),
1402                ScalarImpl::from(5).into(),
1403                ScalarImpl::from(5).into(),
1404            );
1405        }
1406
1407        #[test]
1408        fn test_simple() {
1409            let order_data_type = DataType::Int32;
1410            let order_type = OrderType::ascending();
1411
1412            let cache = create_cache!(order_type, [(2, 2), (3, 3), (4, 4), (5, 5), (6, 6)]);
1413            let delta = create_delta!(
1414                order_type,
1415                [(2, 2, Insert), (2, 22, Insert), (3, 3, Delete)]
1416            );
1417            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1418
1419            assert_find_left_right_result_eq(
1420                order_data_type.clone(),
1421                order_type,
1422                part_with_delta,
1423                ScalarImpl::from(0),
1424                ScalarImpl::from(2).into(),
1425                ScalarImpl::from(2).into(),
1426            );
1427
1428            assert_find_left_right_result_eq(
1429                order_data_type.clone(),
1430                order_type,
1431                part_with_delta,
1432                ScalarImpl::from(2),
1433                ScalarImpl::from(2).into(),
1434                ScalarImpl::from(22).into(),
1435            );
1436
1437            assert_find_left_right_result_eq(
1438                order_data_type.clone(),
1439                order_type,
1440                part_with_delta,
1441                ScalarImpl::from(3),
1442                ScalarImpl::from(4).into(),
1443                ScalarImpl::from(22).into(),
1444            );
1445        }
1446
1447        #[test]
1448        fn test_empty_with_sentinels() {
1449            let order_data_type = DataType::Int32;
1450            let order_type = OrderType::ascending();
1451
1452            let cache = create_cache!(order_type, [..., , ...]);
1453            let delta = create_delta!(order_type, [(1, 1, Insert), (2, 2, Insert)]);
1454            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1455
1456            assert_find_left_right_result_eq(
1457                order_data_type.clone(),
1458                order_type,
1459                part_with_delta,
1460                ScalarImpl::from(0),
1461                Sentinelled::Smallest,
1462                Sentinelled::Smallest,
1463            );
1464
1465            assert_find_left_right_result_eq(
1466                order_data_type.clone(),
1467                order_type,
1468                part_with_delta,
1469                ScalarImpl::from(1),
1470                Sentinelled::Smallest,
1471                ScalarImpl::from(1).into(),
1472            );
1473
1474            assert_find_left_right_result_eq(
1475                order_data_type.clone(),
1476                order_type,
1477                part_with_delta,
1478                ScalarImpl::from(2),
1479                ScalarImpl::from(2).into(),
1480                Sentinelled::Largest,
1481            );
1482
1483            assert_find_left_right_result_eq(
1484                order_data_type.clone(),
1485                order_type,
1486                part_with_delta,
1487                ScalarImpl::from(3),
1488                Sentinelled::Largest,
1489                Sentinelled::Largest,
1490            );
1491        }
1492
1493        #[test]
1494        fn test_with_left_sentinels() {
1495            let order_data_type = DataType::Int32;
1496            let order_type = OrderType::ascending();
1497
1498            let cache = create_cache!(order_type, [..., (2, 2), (4, 4), (5, 5)]);
1499            let delta = create_delta!(
1500                order_type,
1501                [
1502                    (1, 1, Insert),
1503                    (2, 2, Insert),
1504                    (4, 44, Insert),
1505                    (5, 5, Delete)
1506                ]
1507            );
1508            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1509
1510            assert_find_left_right_result_eq(
1511                order_data_type.clone(),
1512                order_type,
1513                part_with_delta,
1514                ScalarImpl::from(1),
1515                Sentinelled::Smallest,
1516                ScalarImpl::from(1).into(),
1517            );
1518
1519            assert_find_left_right_result_eq(
1520                order_data_type.clone(),
1521                order_type,
1522                part_with_delta,
1523                ScalarImpl::from(4),
1524                ScalarImpl::from(4).into(),
1525                ScalarImpl::from(44).into(),
1526            );
1527
1528            assert_find_left_right_result_eq(
1529                order_data_type.clone(),
1530                order_type,
1531                part_with_delta,
1532                ScalarImpl::from(5),
1533                ScalarImpl::from(44).into(),
1534                ScalarImpl::from(44).into(),
1535            );
1536        }
1537
1538        #[test]
1539        fn test_with_right_sentinel() {
1540            let order_data_type = DataType::Int32;
1541            let order_type = OrderType::ascending();
1542
1543            let cache = create_cache!(order_type, [(2, 2), (4, 4), (5, 5), ...]);
1544            let delta = create_delta!(
1545                order_type,
1546                [
1547                    (1, 1, Insert),
1548                    (2, 2, Insert),
1549                    (4, 44, Insert),
1550                    (5, 5, Delete)
1551                ]
1552            );
1553            let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1554
1555            assert_find_left_right_result_eq(
1556                order_data_type.clone(),
1557                order_type,
1558                part_with_delta,
1559                ScalarImpl::from(1),
1560                ScalarImpl::from(1).into(),
1561                ScalarImpl::from(1).into(),
1562            );
1563
1564            assert_find_left_right_result_eq(
1565                order_data_type.clone(),
1566                order_type,
1567                part_with_delta,
1568                ScalarImpl::from(4),
1569                ScalarImpl::from(4).into(),
1570                Sentinelled::Largest,
1571            );
1572
1573            assert_find_left_right_result_eq(
1574                order_data_type.clone(),
1575                order_type,
1576                part_with_delta,
1577                ScalarImpl::from(5),
1578                Sentinelled::Largest,
1579                Sentinelled::Largest,
1580            );
1581        }
1582    }
1583}