1use std::ops::Bound;
19
20use delta_btree_map::{CursorWithDelta, DeltaBTreeMap};
21use itertools::Itertools;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{Datum, Sentinelled, ToDatumRef};
24use risingwave_common::util::memcmp_encoding;
25use risingwave_common::util::sort_util::cmp_datum;
26use risingwave_expr::window_function::{FrameBound, RangeFrameBounds, RowsFrameBounds, StateKey};
27
28use super::over_partition::CacheKey;
29
30pub(super) fn merge_rows_frames(rows_frames: &[&RowsFrameBounds]) -> RowsFrameBounds {
36 if rows_frames.is_empty() {
37 return RowsFrameBounds {
55 start: FrameBound::CurrentRow,
56 end: FrameBound::CurrentRow,
57 };
58 }
59
60 let none_as_max_cmp = |x: &Option<usize>, y: &Option<usize>| match (x, y) {
61 (None, None) => std::cmp::Ordering::Equal,
63 (None, Some(_)) => std::cmp::Ordering::Greater,
64 (Some(_), None) => std::cmp::Ordering::Less,
65 (Some(x), Some(y)) => x.cmp(y),
66 };
67
68 let start = rows_frames
72 .iter()
73 .map(|bounds| bounds.n_preceding_rows())
74 .max_by(none_as_max_cmp)
75 .unwrap();
76 let end = rows_frames
77 .iter()
78 .map(|bounds| bounds.n_following_rows())
79 .max_by(none_as_max_cmp)
80 .unwrap();
81
82 RowsFrameBounds {
83 start: start
84 .map(FrameBound::Preceding) .unwrap_or(FrameBound::UnboundedPreceding),
86 end: end
87 .map(FrameBound::Following) .unwrap_or(FrameBound::UnboundedFollowing),
89 }
90}
91
92pub(super) fn find_first_curr_for_rows_frame<'cache>(
106 frame_bounds: &RowsFrameBounds,
107 part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
108 delta_key: &'cache CacheKey,
109) -> &'cache CacheKey {
110 find_curr_for_rows_frame::<true >(frame_bounds, part_with_delta, delta_key)
111}
112
113pub(super) fn find_last_curr_for_rows_frame<'cache>(
119 frame_bounds: &RowsFrameBounds,
120 part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
121 delta_key: &'cache CacheKey,
122) -> &'cache CacheKey {
123 find_curr_for_rows_frame::<false >(frame_bounds, part_with_delta, delta_key)
124}
125
126pub(super) fn find_frame_start_for_rows_frame<'cache>(
130 frame_bounds: &RowsFrameBounds,
131 part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
132 curr_key: &'cache CacheKey,
133) -> &'cache CacheKey {
134 find_boundary_for_rows_frame::<true >(frame_bounds, part_with_delta, curr_key)
135}
136
137pub(super) fn find_frame_end_for_rows_frame<'cache>(
143 frame_bounds: &RowsFrameBounds,
144 part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
145 curr_key: &'cache CacheKey,
146) -> &'cache CacheKey {
147 find_boundary_for_rows_frame::<false >(frame_bounds, part_with_delta, curr_key)
148}
149
150pub(super) fn calc_logical_curr_for_range_frames(
153 range_frames: &[RangeFrameBounds],
154 delta_first_key: &StateKey,
155 delta_last_key: &StateKey,
156) -> Option<(Sentinelled<Datum>, Sentinelled<Datum>)> {
157 calc_logical_ord_for_range_frames(
158 range_frames,
159 delta_first_key,
160 delta_last_key,
161 |bounds, v| bounds.first_curr_of(v),
162 |bounds, v| bounds.last_curr_of(v),
163 )
164}
165
166pub(super) fn calc_logical_boundary_for_range_frames(
170 range_frames: &[RangeFrameBounds],
171 first_curr_key: &StateKey,
172 last_curr_key: &StateKey,
173) -> Option<(Sentinelled<Datum>, Sentinelled<Datum>)> {
174 calc_logical_ord_for_range_frames(
175 range_frames,
176 first_curr_key,
177 last_curr_key,
178 |bounds, v| bounds.frame_start_of(v),
179 |bounds, v| bounds.frame_end_of(v),
180 )
181}
182
183pub(super) fn find_left_for_range_frames<'cache>(
187 range_frames: &[RangeFrameBounds],
188 part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
189 logical_order_value: impl ToDatumRef,
190 cache_key_pk_len: usize, ) -> &'cache CacheKey {
192 find_for_range_frames::<true >(
193 range_frames,
194 part_with_delta,
195 logical_order_value,
196 cache_key_pk_len,
197 )
198}
199
200pub(super) fn find_right_for_range_frames<'cache>(
204 range_frames: &[RangeFrameBounds],
205 part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
206 logical_order_value: impl ToDatumRef,
207 cache_key_pk_len: usize, ) -> &'cache CacheKey {
209 find_for_range_frames::<false >(
210 range_frames,
211 part_with_delta,
212 logical_order_value,
213 cache_key_pk_len,
214 )
215}
216
217fn find_curr_for_rows_frame<'cache, const LEFT: bool>(
220 frame_bounds: &RowsFrameBounds,
221 part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
222 delta_key: &'cache CacheKey,
223) -> &'cache CacheKey {
224 debug_assert!(frame_bounds.is_canonical());
225 if LEFT {
226 debug_assert!(
227 !frame_bounds.end.is_unbounded_following(),
228 "no need to call this function whenever any frame end is unbounded"
229 );
230 } else {
231 debug_assert!(
232 !frame_bounds.start.is_unbounded_preceding(),
233 "no need to call this function whenever any frame start is unbounded"
234 );
235 }
236 debug_assert!(
237 part_with_delta.first_key().is_some(),
238 "must have something in the range cache after applying delta"
239 );
240
241 let mut cursor = if LEFT {
283 part_with_delta.lower_bound(Bound::Included(delta_key))
284 } else {
285 part_with_delta.upper_bound(Bound::Included(delta_key))
286 };
287 let pointed_key = |cursor: CursorWithDelta<'cache, CacheKey, OwnedRow>| {
288 if LEFT {
289 cursor.peek_next().map(|(k, _)| k)
290 } else {
291 cursor.peek_prev().map(|(k, _)| k)
292 }
293 };
294
295 let n_rows_to_move = if LEFT {
296 frame_bounds.n_following_rows().unwrap()
297 } else {
298 frame_bounds.n_preceding_rows().unwrap()
299 };
300
301 if n_rows_to_move == 0 {
302 return pointed_key(cursor)
303 .or_else(|| {
304 if LEFT {
305 part_with_delta.last_key()
306 } else {
307 part_with_delta.first_key()
308 }
309 })
310 .unwrap();
311 }
312
313 for _ in 0..n_rows_to_move {
314 let res = if LEFT { cursor.prev() } else { cursor.next() };
315 if res.is_none() {
316 break;
318 }
319 }
320
321 pointed_key(cursor).unwrap()
324}
325
326fn find_boundary_for_rows_frame<'cache, const LEFT: bool>(
327 frame_bounds: &RowsFrameBounds,
328 part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
329 curr_key: &'cache CacheKey,
330) -> &'cache CacheKey {
331 debug_assert!(frame_bounds.is_canonical());
332 if LEFT {
333 debug_assert!(
334 !frame_bounds.start.is_unbounded_preceding(),
335 "no need to call this function whenever any frame start is unbounded"
336 );
337 } else {
338 debug_assert!(
339 !frame_bounds.end.is_unbounded_following(),
340 "no need to call this function whenever any frame end is unbounded"
341 );
342 }
343
344 let mut cursor = if LEFT {
349 part_with_delta.before(curr_key).unwrap()
350 } else {
351 part_with_delta.after(curr_key).unwrap()
352 };
353 let pointed_key = |cursor: CursorWithDelta<'cache, CacheKey, OwnedRow>| {
354 if LEFT {
355 cursor.peek_next().map(|(k, _)| k)
356 } else {
357 cursor.peek_prev().map(|(k, _)| k)
358 }
359 };
360
361 let n_rows_to_move = if LEFT {
362 frame_bounds.n_preceding_rows().unwrap()
363 } else {
364 frame_bounds.n_following_rows().unwrap()
365 };
366
367 for _ in 0..n_rows_to_move {
368 let res = if LEFT { cursor.prev() } else { cursor.next() };
369 if res.is_none() {
370 break;
372 }
373 }
374
375 pointed_key(cursor).unwrap()
378}
379
380fn calc_logical_ord_for_range_frames(
390 range_frames: &[RangeFrameBounds],
391 left_key: &StateKey,
392 right_key: &StateKey,
393 left_offset_fn: impl Fn(&RangeFrameBounds, &Datum) -> Sentinelled<Datum>,
394 right_offset_fn: impl Fn(&RangeFrameBounds, &Datum) -> Sentinelled<Datum>,
395) -> Option<(Sentinelled<Datum>, Sentinelled<Datum>)> {
396 if range_frames.is_empty() {
397 return None;
398 }
399
400 let (data_type, order_type) = range_frames
401 .iter()
402 .map(|bounds| (&bounds.order_data_type, bounds.order_type))
403 .all_equal_value()
404 .unwrap();
405
406 let datum_cmp = |a: &Datum, b: &Datum| cmp_datum(a, b, order_type);
407
408 let left_given_ord = memcmp_encoding::decode_value(data_type, &left_key.order_key, order_type)
409 .expect("no reason to fail because we just encoded it in memory");
410 let right_given_ord =
411 memcmp_encoding::decode_value(data_type, &right_key.order_key, order_type)
412 .expect("no reason to fail because we just encoded it in memory");
413
414 let logical_left_offset_ord = {
415 let mut order_value = None;
416 for bounds in range_frames {
417 let new_order_value = left_offset_fn(bounds, &left_given_ord);
418 order_value = match (order_value, new_order_value) {
419 (None, any_new) => Some(any_new),
420 (Some(old), new) => Some(std::cmp::min_by(old, new, |x, y| x.cmp_by(y, datum_cmp))),
421 };
422 if !order_value.as_ref().unwrap().is_normal() {
423 assert!(
425 order_value.as_ref().unwrap().is_smallest(),
426 "left order value should never be `Largest`"
427 );
428 break;
429 }
430 }
431 order_value.expect("# of range frames > 0")
432 };
433
434 let logical_right_offset_ord = {
435 let mut order_value = None;
436 for bounds in range_frames {
437 let new_order_value = right_offset_fn(bounds, &right_given_ord);
438 order_value = match (order_value, new_order_value) {
439 (None, any_new) => Some(any_new),
440 (Some(old), new) => Some(std::cmp::max_by(old, new, |x, y| x.cmp_by(y, datum_cmp))),
441 };
442 if !order_value.as_ref().unwrap().is_normal() {
443 assert!(
445 order_value.as_ref().unwrap().is_largest(),
446 "right order value should never be `Smallest`"
447 );
448 break;
449 }
450 }
451 order_value.expect("# of range frames > 0")
452 };
453
454 Some((logical_left_offset_ord, logical_right_offset_ord))
455}
456
457fn find_for_range_frames<'cache, const LEFT: bool>(
458 range_frames: &[RangeFrameBounds],
459 part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
460 logical_order_value: impl ToDatumRef,
461 cache_key_pk_len: usize,
462) -> &'cache CacheKey {
463 debug_assert!(
464 part_with_delta.first_key().is_some(),
465 "must have something in the range cache after applying delta"
466 );
467
468 let order_type = range_frames
469 .iter()
470 .map(|bounds| bounds.order_type)
471 .all_equal_value()
472 .unwrap();
473
474 let search_key = Sentinelled::Normal(StateKey {
475 order_key: memcmp_encoding::encode_value(logical_order_value, order_type)
476 .expect("the data type is simple, should succeed"),
477 pk: if LEFT {
478 OwnedRow::empty() } else {
480 OwnedRow::new(vec![None; cache_key_pk_len]) }
482 .into(),
483 });
484
485 if LEFT {
486 let cursor = part_with_delta.lower_bound(Bound::Included(&search_key));
487 if let Some((prev_key, _)) = cursor.peek_prev()
488 && prev_key.is_smallest()
489 {
490 prev_key
495 } else {
496 cursor
501 .peek_next()
502 .map(|(k, _)| k)
503 .or_else(|| part_with_delta.last_key())
504 .unwrap()
505 }
506 } else {
507 let cursor = part_with_delta.upper_bound(Bound::Included(&search_key));
508 if let Some((next_key, _)) = cursor.peek_next()
509 && next_key.is_largest()
510 {
511 next_key
512 } else {
513 cursor
514 .peek_prev()
515 .map(|(k, _)| k)
516 .or_else(|| part_with_delta.first_key())
517 .unwrap()
518 }
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use std::collections::BTreeMap;
525
526 use delta_btree_map::Change;
527 use risingwave_common::types::{ScalarImpl, Sentinelled};
528 use risingwave_expr::window_function::FrameBound::*;
529 use risingwave_expr::window_function::{RowsFrameBounds, StateKey};
530
531 use super::*;
532
533 #[test]
534 fn test_merge_rows_frame() {
535 fn assert_equivalent(bounds1: RowsFrameBounds, bounds2: RowsFrameBounds) {
536 assert_eq!(bounds1.start.to_offset(), bounds2.start.to_offset());
537 assert_eq!(bounds1.end.to_offset(), bounds2.end.to_offset());
538 }
539
540 assert_equivalent(
541 merge_rows_frames(&[]),
542 RowsFrameBounds {
543 start: CurrentRow,
544 end: CurrentRow,
545 },
546 );
547
548 let frames = [
549 &RowsFrameBounds {
550 start: Preceding(3),
551 end: Preceding(2),
552 },
553 &RowsFrameBounds {
554 start: Preceding(1),
555 end: Preceding(4),
556 },
557 ];
558 assert_equivalent(
559 merge_rows_frames(&frames),
560 RowsFrameBounds {
561 start: Preceding(4),
562 end: CurrentRow,
563 },
564 );
565
566 let frames = [
567 &RowsFrameBounds {
568 start: Preceding(3),
569 end: Following(2),
570 },
571 &RowsFrameBounds {
572 start: Preceding(2),
573 end: Following(3),
574 },
575 ];
576 assert_equivalent(
577 merge_rows_frames(&frames),
578 RowsFrameBounds {
579 start: Preceding(3),
580 end: Following(3),
581 },
582 );
583
584 let frames = [
585 &RowsFrameBounds {
586 start: UnboundedPreceding,
587 end: Following(2),
588 },
589 &RowsFrameBounds {
590 start: Preceding(2),
591 end: UnboundedFollowing,
592 },
593 ];
594 assert_equivalent(
595 merge_rows_frames(&frames),
596 RowsFrameBounds {
597 start: UnboundedPreceding,
598 end: UnboundedFollowing,
599 },
600 );
601
602 let frames = [
603 &RowsFrameBounds {
604 start: UnboundedPreceding,
605 end: Following(2),
606 },
607 &RowsFrameBounds {
608 start: Following(5),
609 end: Following(2),
610 },
611 ];
612 assert_equivalent(
613 merge_rows_frames(&frames),
614 RowsFrameBounds {
615 start: UnboundedPreceding,
616 end: Following(5),
617 },
618 );
619 }
620
621 macro_rules! create_cache {
622 (..., $( $pk:literal ),* , ...) => {
623 {
624 let mut cache = create_cache!( $( $pk ),* );
625 cache.insert(CacheKey::Smallest, OwnedRow::empty().into());
626 cache.insert(CacheKey::Largest, OwnedRow::empty().into());
627 cache
628 }
629 };
630 (..., $( $pk:literal ),*) => {
631 {
632 let mut cache = create_cache!( $( $pk ),* );
633 cache.insert(CacheKey::Smallest, OwnedRow::empty().into());
634 cache
635 }
636 };
637 ($( $pk:literal ),* , ...) => {
638 {
639 let mut cache = create_cache!( $( $pk ),* );
640 cache.insert(CacheKey::Largest, OwnedRow::empty().into());
641 cache
642 }
643 };
644 ($( $pk:literal ),*) => {
645 {
646 #[allow(unused_mut)]
647 let mut cache = BTreeMap::new();
648 $(
649 cache.insert(
650 CacheKey::Normal(
651 StateKey {
652 order_key: vec![].into(),
654 pk: OwnedRow::new(vec![Some(ScalarImpl::from($pk))]).into(),
655 },
656 ),
657 OwnedRow::empty(),
659 );
660 )*
661 cache
662 }
663 };
664 ($ord_type:expr, [..., $( ( $ord:literal, $pk:literal ) ),* , ...]) => {
665 {
666 let mut cache = create_cache!($ord_type, [$( ( $ord, $pk ) ),*]);
667 cache.insert(CacheKey::Smallest, OwnedRow::empty().into());
668 cache.insert(CacheKey::Largest, OwnedRow::empty().into());
669 cache
670 }
671 };
672 ($ord_type:expr, [..., $( ( $ord:literal, $pk:literal ) ),*]) => {
673 {
674 let mut cache = create_cache!($ord_type, [$( ( $ord, $pk ) ),*]);
675 cache.insert(CacheKey::Smallest, OwnedRow::empty().into());
676 cache
677 }
678 };
679 ($ord_type:expr, [$( ( $ord:literal, $pk:literal ) ),* , ...]) => {
680 {
681 let mut cache = create_cache!($ord_type, [$( ( $ord, $pk ) ),*]);
682 cache.insert(CacheKey::Largest, OwnedRow::empty().into());
683 cache
684 }
685 };
686 ($ord_type:expr, [$( ( $ord:literal, $pk:literal ) ),*]) => {
687 {
688 #[allow(unused_mut)]
689 let mut cache = BTreeMap::new();
690 $(
691 cache.insert(
692 CacheKey::Normal(
693 StateKey {
694 order_key: memcmp_encoding::encode_value(
695 Some(ScalarImpl::from($ord)),
696 $ord_type,
697 ).unwrap(),
698 pk: OwnedRow::new(vec![Some(ScalarImpl::from($pk))]).into(),
699 },
700 ),
701 OwnedRow::empty(),
703 );
704 )*
705 cache
706 }
707 }
708 }
709
710 macro_rules! create_change {
711 (Delete) => {
712 Change::Delete
713 };
714 (Insert) => {
715 Change::Insert(OwnedRow::empty())
716 };
717 }
718
719 macro_rules! create_delta {
720 ($( ( $pk:literal, $change:ident ) ),+ $(,)?) => {
721 {
722 let mut delta = BTreeMap::new();
723 $(
724 delta.insert(
725 CacheKey::Normal(
726 StateKey {
727 order_key: vec![].into(),
729 pk: OwnedRow::new(vec![Some(ScalarImpl::from($pk))]).into(),
730 },
731 ),
732 create_change!( $change ),
734 );
735 )*
736 delta
737 }
738 };
739 ($ord_type:expr, [ $( ( $ord:literal, $pk:literal, $change:ident ) ),+ $(,)? ]) => {
740 {
741 let mut delta = BTreeMap::new();
742 $(
743 delta.insert(
744 CacheKey::Normal(
745 StateKey {
746 order_key: memcmp_encoding::encode_value(
747 Some(ScalarImpl::from($ord)),
748 $ord_type,
749 ).unwrap(),
750 pk: OwnedRow::new(vec![Some(ScalarImpl::from($pk))]).into(),
751 },
752 ),
753 create_change!( $change ),
755 );
756 )*
757 delta
758 }
759 };
760 }
761
762 mod rows_frame_tests {
763 use super::*;
764
765 fn assert_cache_key_eq(given: &CacheKey, expected: impl Into<ScalarImpl>) {
766 assert_eq!(
767 given.as_normal_expect().pk.0,
768 OwnedRow::new(vec![Some(expected.into())])
769 )
770 }
771
772 #[test]
773 fn test_insert_delta_only() {
774 let cache = create_cache!();
775 let delta = create_delta!((1, Insert), (2, Insert), (3, Insert));
776 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
777 let delta_first_key = delta.first_key_value().unwrap().0;
778 let delta_last_key = delta.last_key_value().unwrap().0;
779
780 let bounds = RowsFrameBounds {
781 start: Preceding(2),
782 end: CurrentRow,
783 };
784
785 let first_curr_key =
786 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
787 let last_curr_key =
788 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
789 assert_cache_key_eq(first_curr_key, 1);
790 assert_cache_key_eq(last_curr_key, 3);
791
792 let first_frame_start =
793 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
794 let last_frame_end =
795 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
796 assert_cache_key_eq(first_frame_start, 1);
797 assert_cache_key_eq(last_frame_end, 3);
798 }
799
800 #[test]
801 fn test_simple() {
802 let cache = create_cache!(1, 2, 3, 4, 5, 6);
803 let delta = create_delta!((2, Insert), (3, Delete));
804 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
805 let delta_first_key = delta.first_key_value().unwrap().0;
806 let delta_last_key = delta.last_key_value().unwrap().0;
807
808 {
809 let bounds = RowsFrameBounds {
810 start: Preceding(2),
811 end: CurrentRow,
812 };
813 let first_curr_key =
814 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
815 let last_curr_key =
816 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
817 assert_cache_key_eq(first_curr_key, 2);
818 assert_cache_key_eq(last_curr_key, 5);
819
820 let first_frame_start =
821 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
822 let last_frame_end =
823 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
824 assert_cache_key_eq(first_frame_start, 1);
825 assert_cache_key_eq(last_frame_end, 5);
826 }
827
828 {
829 let bounds = RowsFrameBounds {
830 start: Preceding(1),
831 end: Following(2),
832 };
833 let first_curr_key =
834 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
835 let last_curr_key =
836 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
837 assert_cache_key_eq(first_curr_key, 1);
838 assert_cache_key_eq(last_curr_key, 4);
839
840 let first_frame_start =
841 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
842 let last_frame_end =
843 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
844 assert_cache_key_eq(first_frame_start, 1);
845 assert_cache_key_eq(last_frame_end, 6);
846 }
847
848 {
849 let bounds = RowsFrameBounds {
850 start: CurrentRow,
851 end: Following(2),
852 };
853 let first_curr_key =
854 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
855 let last_curr_key =
856 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
857 assert_cache_key_eq(first_curr_key, 1);
858 assert_cache_key_eq(last_curr_key, 2);
859
860 let first_frame_start =
861 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
862 let last_frame_end =
863 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
864 assert_cache_key_eq(first_frame_start, 1);
865 assert_cache_key_eq(last_frame_end, 5);
866 }
867 }
868
869 #[test]
870 fn test_lag_corner_case() {
871 let cache = create_cache!(1, 2, 3, 4, 5, 6);
872 let delta = create_delta!((1, Delete), (2, Delete), (3, Delete));
873 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
874 let delta_first_key = delta.first_key_value().unwrap().0;
875 let delta_last_key = delta.last_key_value().unwrap().0;
876
877 let bounds = RowsFrameBounds {
878 start: Preceding(1),
879 end: CurrentRow,
880 };
881
882 let first_curr_key =
883 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
884 let last_curr_key =
885 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
886 assert_cache_key_eq(first_curr_key, 4);
887 assert_cache_key_eq(last_curr_key, 4);
888
889 let first_frame_start =
890 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
891 let last_frame_end =
892 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
893 assert_cache_key_eq(first_frame_start, 4);
894 assert_cache_key_eq(last_frame_end, 4);
895 }
896
897 #[test]
898 fn test_lead_corner_case() {
899 let cache = create_cache!(1, 2, 3, 4, 5, 6);
900 let delta = create_delta!((4, Delete), (5, Delete), (6, Delete));
901 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
902 let delta_first_key = delta.first_key_value().unwrap().0;
903 let delta_last_key = delta.last_key_value().unwrap().0;
904
905 let bounds = RowsFrameBounds {
906 start: CurrentRow,
907 end: Following(1),
908 };
909
910 let first_curr_key =
911 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
912 let last_curr_key =
913 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
914 assert_cache_key_eq(first_curr_key, 3);
915 assert_cache_key_eq(last_curr_key, 3);
916
917 let first_frame_start =
918 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
919 let last_frame_end =
920 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
921 assert_cache_key_eq(first_frame_start, 3);
922 assert_cache_key_eq(last_frame_end, 3);
923 }
924
925 #[test]
926 fn test_lag_lead_offset_0_corner_case_1() {
927 let cache = create_cache!(1, 2, 3, 4);
928 let delta = create_delta!((2, Delete), (3, Delete));
929 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
930 let delta_first_key = delta.first_key_value().unwrap().0;
931 let delta_last_key = delta.last_key_value().unwrap().0;
932
933 let bounds = RowsFrameBounds {
934 start: CurrentRow,
935 end: CurrentRow,
936 };
937
938 let first_curr_key =
939 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
940 let last_curr_key =
941 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
942 assert_cache_key_eq(first_curr_key, 4);
943 assert_cache_key_eq(last_curr_key, 1);
944
945 }
947
948 #[test]
949 fn test_lag_lead_offset_0_corner_case_2() {
950 let cache = create_cache!(1, 2, 3, 4);
953 let delta = create_delta!((2, Delete), (3, Delete), (4, Delete));
954 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
955 let delta_first_key = delta.first_key_value().unwrap().0;
956 let delta_last_key = delta.last_key_value().unwrap().0;
957
958 let bounds = RowsFrameBounds {
959 start: CurrentRow,
960 end: CurrentRow,
961 };
962
963 let first_curr_key =
964 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
965 let last_curr_key =
966 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
967 assert_cache_key_eq(first_curr_key, 1);
968 assert_cache_key_eq(last_curr_key, 1);
969
970 let first_frame_start =
971 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
972 let last_frame_end =
973 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
974 assert_cache_key_eq(first_frame_start, 1);
975 assert_cache_key_eq(last_frame_end, 1);
976 }
977
978 #[test]
979 fn test_lag_lead_offset_0_corner_case_3() {
980 let cache = create_cache!(1, 2, 3, 4, 5);
981 let delta = create_delta!((2, Delete), (3, Insert), (4, Delete));
982 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
983 let delta_first_key = delta.first_key_value().unwrap().0;
984 let delta_last_key = delta.last_key_value().unwrap().0;
985
986 let bounds = RowsFrameBounds {
987 start: CurrentRow,
988 end: CurrentRow,
989 };
990
991 let first_curr_key =
992 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
993 let last_curr_key =
994 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
995 assert_cache_key_eq(first_curr_key, 3);
996 assert_cache_key_eq(last_curr_key, 3);
997
998 let first_frame_start =
999 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1000 let last_frame_end =
1001 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1002 assert_cache_key_eq(first_frame_start, 3);
1003 assert_cache_key_eq(last_frame_end, 3);
1004 }
1005
1006 #[test]
1007 fn test_empty_with_sentinels() {
1008 let cache: BTreeMap<Sentinelled<StateKey>, OwnedRow> = create_cache!(..., , ...);
1009 let delta = create_delta!((1, Insert), (2, Insert));
1010 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1011 let delta_first_key = delta.first_key_value().unwrap().0;
1012 let delta_last_key = delta.last_key_value().unwrap().0;
1013
1014 {
1015 let bounds = RowsFrameBounds {
1016 start: CurrentRow,
1017 end: CurrentRow,
1018 };
1019 let first_curr_key =
1020 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1021 let last_curr_key =
1022 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1023 assert_cache_key_eq(first_curr_key, 1);
1024 assert_cache_key_eq(last_curr_key, 2);
1025
1026 let first_frame_start =
1027 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1028 let last_frame_end =
1029 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1030 assert_cache_key_eq(first_frame_start, 1);
1031 assert_cache_key_eq(last_frame_end, 2);
1032 }
1033
1034 {
1035 let bounds = RowsFrameBounds {
1036 start: Preceding(1),
1037 end: CurrentRow,
1038 };
1039 let first_curr_key =
1040 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1041 let last_curr_key =
1042 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1043 assert_cache_key_eq(first_curr_key, 1);
1044 assert!(last_curr_key.is_largest());
1045
1046 }
1048
1049 {
1050 let bounds = RowsFrameBounds {
1051 start: CurrentRow,
1052 end: Following(3),
1053 };
1054
1055 let first_curr_key =
1056 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1057 let last_curr_key =
1058 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1059 assert!(first_curr_key.is_smallest());
1060 assert_cache_key_eq(last_curr_key, 2);
1061
1062 }
1064 }
1065
1066 #[test]
1067 fn test_with_left_sentinel() {
1068 let cache = create_cache!(..., 2, 4, 5, 8);
1069 let delta = create_delta!((3, Insert), (4, Insert), (8, Delete));
1070 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1071 let delta_first_key = delta.first_key_value().unwrap().0;
1072 let delta_last_key = delta.last_key_value().unwrap().0;
1073
1074 {
1075 let bounds = RowsFrameBounds {
1076 start: CurrentRow,
1077 end: Following(1),
1078 };
1079 let first_curr_key =
1080 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1081 let last_curr_key =
1082 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1083 assert_cache_key_eq(first_curr_key, 2);
1084 assert_cache_key_eq(last_curr_key, 5);
1085
1086 let first_frame_start =
1087 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1088 let last_frame_end =
1089 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1090 assert_cache_key_eq(first_frame_start, 2);
1091 assert_cache_key_eq(last_frame_end, 5);
1092 }
1093
1094 {
1095 let bounds = RowsFrameBounds {
1096 start: Preceding(1),
1097 end: Following(1),
1098 };
1099 let first_curr_key =
1100 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1101 let last_curr_key =
1102 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1103 assert_cache_key_eq(first_curr_key, 2);
1104 assert_cache_key_eq(last_curr_key, 5);
1105
1106 let first_frame_start =
1107 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1108 let last_frame_end =
1109 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1110 assert!(first_frame_start.is_smallest());
1111 assert_cache_key_eq(last_frame_end, 5);
1112 }
1113 }
1114
1115 #[test]
1116 fn test_with_right_sentinel() {
1117 let cache = create_cache!(1, 2, 4, 5, 8, ...);
1118 let delta = create_delta!((3, Insert), (4, Insert), (5, Delete));
1119 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1120 let delta_first_key = delta.first_key_value().unwrap().0;
1121 let delta_last_key = delta.last_key_value().unwrap().0;
1122
1123 {
1124 let bounds = RowsFrameBounds {
1125 start: Preceding(1),
1126 end: CurrentRow,
1127 };
1128 let first_curr_key =
1129 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1130 let last_curr_key =
1131 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1132 assert_cache_key_eq(first_curr_key, 3);
1133 assert_cache_key_eq(last_curr_key, 8);
1134
1135 let first_frame_start =
1136 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1137 let last_frame_end =
1138 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1139 assert_cache_key_eq(first_frame_start, 2);
1140 assert_cache_key_eq(last_frame_end, 8);
1141 }
1142
1143 {
1144 let bounds = RowsFrameBounds {
1145 start: Preceding(1),
1146 end: Following(1),
1147 };
1148 let first_curr_key =
1149 find_first_curr_for_rows_frame(&bounds, part_with_delta, delta_first_key);
1150 let last_curr_key =
1151 find_last_curr_for_rows_frame(&bounds, part_with_delta, delta_last_key);
1152 assert_cache_key_eq(first_curr_key, 2);
1153 assert_cache_key_eq(last_curr_key, 8);
1154
1155 let first_frame_start =
1156 find_frame_start_for_rows_frame(&bounds, part_with_delta, first_curr_key);
1157 let last_frame_end =
1158 find_frame_end_for_rows_frame(&bounds, part_with_delta, last_curr_key);
1159 assert_cache_key_eq(first_frame_start, 1);
1160 assert!(last_frame_end.is_largest());
1161 }
1162 }
1163 }
1164
1165 mod range_frame_tests {
1166 use risingwave_common::types::{DataType, Interval, data_types};
1167 use risingwave_common::util::sort_util::OrderType;
1168 use risingwave_expr::window_function::RangeFrameOffset;
1169
1170 use super::*;
1171
1172 fn create_range_frame<T>(
1173 order_data_type: DataType,
1174 order_type: OrderType,
1175 start: FrameBound<T>,
1176 end: FrameBound<T>,
1177 ) -> RangeFrameBounds
1178 where
1179 T: Into<ScalarImpl>,
1180 {
1181 let offset_data_type = match &order_data_type {
1182 t @ data_types::range_frame_numeric!() => t.clone(),
1183 data_types::range_frame_datetime!() => DataType::Interval,
1184 _ => unreachable!(),
1185 };
1186
1187 let map_fn = |x: T| {
1188 RangeFrameOffset::new_for_test(x.into(), &order_data_type, &offset_data_type)
1189 };
1190 let start = start.map(map_fn);
1191 let end = end.map(map_fn);
1192
1193 RangeFrameBounds {
1194 order_data_type,
1195 order_type,
1196 offset_data_type,
1197 start,
1198 end,
1199 }
1200 }
1201
1202 #[test]
1203 fn test_calc_logical_for_int64_asc() {
1204 let order_data_type = DataType::Int64;
1205 let order_type = OrderType::ascending();
1206
1207 let range_frames = [
1208 create_range_frame(
1209 order_data_type.clone(),
1210 order_type,
1211 Preceding(3i64),
1212 Preceding(2i64),
1213 ),
1214 create_range_frame(
1215 order_data_type.clone(),
1216 order_type,
1217 Preceding(1i64),
1218 Following(2i64),
1219 ),
1220 ];
1221
1222 let ord_key_1 = StateKey {
1223 order_key: memcmp_encoding::encode_value(Some(ScalarImpl::Int64(1)), order_type)
1224 .unwrap(),
1225 pk: OwnedRow::empty().into(),
1226 };
1227 let ord_key_2 = StateKey {
1228 order_key: memcmp_encoding::encode_value(Some(ScalarImpl::Int64(3)), order_type)
1229 .unwrap(),
1230 pk: OwnedRow::empty().into(),
1231 };
1232
1233 let (logical_first_curr, logical_last_curr) =
1234 calc_logical_curr_for_range_frames(&range_frames, &ord_key_1, &ord_key_2).unwrap();
1235 assert_eq!(
1236 logical_first_curr.as_normal_expect(),
1237 &Some(ScalarImpl::Int64(-1))
1238 );
1239 assert_eq!(
1240 logical_last_curr.as_normal_expect(),
1241 &Some(ScalarImpl::Int64(6))
1242 );
1243
1244 let (first_start, last_end) =
1245 calc_logical_boundary_for_range_frames(&range_frames, &ord_key_1, &ord_key_2)
1246 .unwrap();
1247 assert_eq!(first_start.as_normal_expect(), &Some(ScalarImpl::Int64(-2)));
1248 assert_eq!(last_end.as_normal_expect(), &Some(ScalarImpl::Int64(5)));
1249 }
1250
1251 #[test]
1252 fn test_calc_logical_for_timestamp_desc_nulls_first() {
1253 let order_data_type = DataType::Timestamp;
1254 let order_type = OrderType::descending_nulls_first();
1255
1256 let range_frames = [create_range_frame(
1257 order_data_type.clone(),
1258 order_type,
1259 Preceding(Interval::from_month_day_usec(1, 2, 3 * 1000 * 1000)),
1260 Following(Interval::from_month_day_usec(0, 1, 0)),
1261 )];
1262
1263 let ord_key_1 = StateKey {
1264 order_key: memcmp_encoding::encode_value(
1265 Some(ScalarImpl::Timestamp(
1266 "2024-01-28 00:30:00".parse().unwrap(),
1267 )),
1268 order_type,
1269 )
1270 .unwrap(),
1271 pk: OwnedRow::empty().into(),
1272 };
1273 let ord_key_2 = StateKey {
1274 order_key: memcmp_encoding::encode_value(
1275 Some(ScalarImpl::Timestamp(
1276 "2024-01-26 15:47:00".parse().unwrap(),
1277 )),
1278 order_type,
1279 )
1280 .unwrap(),
1281 pk: OwnedRow::empty().into(),
1282 };
1283
1284 let (logical_first_curr, logical_last_curr) =
1285 calc_logical_curr_for_range_frames(&range_frames, &ord_key_1, &ord_key_2).unwrap();
1286 assert_eq!(
1287 logical_first_curr.as_normal_expect(),
1288 &Some(ScalarImpl::Timestamp(
1289 "2024-01-29 00:30:00".parse().unwrap()
1290 ))
1291 );
1292 assert_eq!(
1293 logical_last_curr.as_normal_expect(),
1294 &Some(ScalarImpl::Timestamp(
1295 "2023-12-24 15:46:57".parse().unwrap()
1296 ))
1297 );
1298
1299 let (first_start, last_end) =
1300 calc_logical_boundary_for_range_frames(&range_frames, &ord_key_1, &ord_key_2)
1301 .unwrap();
1302 assert_eq!(
1303 first_start.as_normal_expect(),
1304 &Some(ScalarImpl::Timestamp(
1305 "2024-03-01 00:30:03".parse().unwrap()
1306 ))
1307 );
1308 assert_eq!(
1309 last_end.as_normal_expect(),
1310 &Some(ScalarImpl::Timestamp(
1311 "2024-01-25 15:47:00".parse().unwrap()
1312 ))
1313 );
1314 }
1315
1316 fn assert_find_left_right_result_eq(
1317 order_data_type: DataType,
1318 order_type: OrderType,
1319 part_with_delta: DeltaBTreeMap<'_, CacheKey, OwnedRow>,
1320 logical_order_value: ScalarImpl,
1321 expected_left: Sentinelled<ScalarImpl>,
1322 expected_right: Sentinelled<ScalarImpl>,
1323 ) {
1324 let range_frames = if matches!(order_data_type, DataType::Int32) {
1325 [create_range_frame(
1326 order_data_type.clone(),
1327 order_type,
1328 Preceding(0), Following(0), )]
1331 } else {
1332 panic!()
1333 };
1334 let logical_order_value = Some(logical_order_value);
1335 let cache_key_pk_len = 1;
1336
1337 let find_left_res = find_left_for_range_frames(
1338 &range_frames,
1339 part_with_delta,
1340 &logical_order_value,
1341 cache_key_pk_len,
1342 )
1343 .clone();
1344 assert_eq!(
1345 find_left_res.map(|x| x.pk.0.into_iter().next().unwrap().unwrap()),
1346 expected_left
1347 );
1348
1349 let find_right_res = find_right_for_range_frames(
1350 &range_frames,
1351 part_with_delta,
1352 &logical_order_value,
1353 cache_key_pk_len,
1354 )
1355 .clone();
1356 assert_eq!(
1357 find_right_res.map(|x| x.pk.0.into_iter().next().unwrap().unwrap()),
1358 expected_right
1359 );
1360 }
1361
1362 #[test]
1363 fn test_insert_delta_only() {
1364 let order_data_type = DataType::Int32;
1365 let order_type = OrderType::ascending();
1366
1367 let cache = create_cache!();
1368 let delta = create_delta!(
1369 order_type,
1370 [
1371 (1, 1, Insert),
1372 (1, 11, Insert),
1373 (3, 3, Insert),
1374 (5, 5, Insert)
1375 ]
1376 );
1377 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1378
1379 assert_find_left_right_result_eq(
1380 order_data_type.clone(),
1381 order_type,
1382 part_with_delta,
1383 ScalarImpl::from(2),
1384 ScalarImpl::from(3).into(),
1385 ScalarImpl::from(11).into(),
1386 );
1387
1388 assert_find_left_right_result_eq(
1389 order_data_type.clone(),
1390 order_type,
1391 part_with_delta,
1392 ScalarImpl::from(5),
1393 ScalarImpl::from(5).into(),
1394 ScalarImpl::from(5).into(),
1395 );
1396
1397 assert_find_left_right_result_eq(
1398 order_data_type.clone(),
1399 order_type,
1400 part_with_delta,
1401 ScalarImpl::from(6),
1402 ScalarImpl::from(5).into(),
1403 ScalarImpl::from(5).into(),
1404 );
1405 }
1406
1407 #[test]
1408 fn test_simple() {
1409 let order_data_type = DataType::Int32;
1410 let order_type = OrderType::ascending();
1411
1412 let cache = create_cache!(order_type, [(2, 2), (3, 3), (4, 4), (5, 5), (6, 6)]);
1413 let delta = create_delta!(
1414 order_type,
1415 [(2, 2, Insert), (2, 22, Insert), (3, 3, Delete)]
1416 );
1417 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1418
1419 assert_find_left_right_result_eq(
1420 order_data_type.clone(),
1421 order_type,
1422 part_with_delta,
1423 ScalarImpl::from(0),
1424 ScalarImpl::from(2).into(),
1425 ScalarImpl::from(2).into(),
1426 );
1427
1428 assert_find_left_right_result_eq(
1429 order_data_type.clone(),
1430 order_type,
1431 part_with_delta,
1432 ScalarImpl::from(2),
1433 ScalarImpl::from(2).into(),
1434 ScalarImpl::from(22).into(),
1435 );
1436
1437 assert_find_left_right_result_eq(
1438 order_data_type.clone(),
1439 order_type,
1440 part_with_delta,
1441 ScalarImpl::from(3),
1442 ScalarImpl::from(4).into(),
1443 ScalarImpl::from(22).into(),
1444 );
1445 }
1446
1447 #[test]
1448 fn test_empty_with_sentinels() {
1449 let order_data_type = DataType::Int32;
1450 let order_type = OrderType::ascending();
1451
1452 let cache = create_cache!(order_type, [..., , ...]);
1453 let delta = create_delta!(order_type, [(1, 1, Insert), (2, 2, Insert)]);
1454 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1455
1456 assert_find_left_right_result_eq(
1457 order_data_type.clone(),
1458 order_type,
1459 part_with_delta,
1460 ScalarImpl::from(0),
1461 Sentinelled::Smallest,
1462 Sentinelled::Smallest,
1463 );
1464
1465 assert_find_left_right_result_eq(
1466 order_data_type.clone(),
1467 order_type,
1468 part_with_delta,
1469 ScalarImpl::from(1),
1470 Sentinelled::Smallest,
1471 ScalarImpl::from(1).into(),
1472 );
1473
1474 assert_find_left_right_result_eq(
1475 order_data_type.clone(),
1476 order_type,
1477 part_with_delta,
1478 ScalarImpl::from(2),
1479 ScalarImpl::from(2).into(),
1480 Sentinelled::Largest,
1481 );
1482
1483 assert_find_left_right_result_eq(
1484 order_data_type.clone(),
1485 order_type,
1486 part_with_delta,
1487 ScalarImpl::from(3),
1488 Sentinelled::Largest,
1489 Sentinelled::Largest,
1490 );
1491 }
1492
1493 #[test]
1494 fn test_with_left_sentinels() {
1495 let order_data_type = DataType::Int32;
1496 let order_type = OrderType::ascending();
1497
1498 let cache = create_cache!(order_type, [..., (2, 2), (4, 4), (5, 5)]);
1499 let delta = create_delta!(
1500 order_type,
1501 [
1502 (1, 1, Insert),
1503 (2, 2, Insert),
1504 (4, 44, Insert),
1505 (5, 5, Delete)
1506 ]
1507 );
1508 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1509
1510 assert_find_left_right_result_eq(
1511 order_data_type.clone(),
1512 order_type,
1513 part_with_delta,
1514 ScalarImpl::from(1),
1515 Sentinelled::Smallest,
1516 ScalarImpl::from(1).into(),
1517 );
1518
1519 assert_find_left_right_result_eq(
1520 order_data_type.clone(),
1521 order_type,
1522 part_with_delta,
1523 ScalarImpl::from(4),
1524 ScalarImpl::from(4).into(),
1525 ScalarImpl::from(44).into(),
1526 );
1527
1528 assert_find_left_right_result_eq(
1529 order_data_type.clone(),
1530 order_type,
1531 part_with_delta,
1532 ScalarImpl::from(5),
1533 ScalarImpl::from(44).into(),
1534 ScalarImpl::from(44).into(),
1535 );
1536 }
1537
1538 #[test]
1539 fn test_with_right_sentinel() {
1540 let order_data_type = DataType::Int32;
1541 let order_type = OrderType::ascending();
1542
1543 let cache = create_cache!(order_type, [(2, 2), (4, 4), (5, 5), ...]);
1544 let delta = create_delta!(
1545 order_type,
1546 [
1547 (1, 1, Insert),
1548 (2, 2, Insert),
1549 (4, 44, Insert),
1550 (5, 5, Delete)
1551 ]
1552 );
1553 let part_with_delta = DeltaBTreeMap::new(&cache, &delta);
1554
1555 assert_find_left_right_result_eq(
1556 order_data_type.clone(),
1557 order_type,
1558 part_with_delta,
1559 ScalarImpl::from(1),
1560 ScalarImpl::from(1).into(),
1561 ScalarImpl::from(1).into(),
1562 );
1563
1564 assert_find_left_right_result_eq(
1565 order_data_type.clone(),
1566 order_type,
1567 part_with_delta,
1568 ScalarImpl::from(4),
1569 ScalarImpl::from(4).into(),
1570 Sentinelled::Largest,
1571 );
1572
1573 assert_find_left_right_result_eq(
1574 order_data_type.clone(),
1575 order_type,
1576 part_with_delta,
1577 ScalarImpl::from(5),
1578 Sentinelled::Largest,
1579 Sentinelled::Largest,
1580 );
1581 }
1582 }
1583}