1use std::collections::VecDeque;
16use std::ops::Range;
17
18use educe::Educe;
19use risingwave_common::array::Op;
20use risingwave_common::types::Sentinelled;
21use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
22use risingwave_expr::window_function::{
23 FrameExclusion, RangeFrameBounds, RowsFrameBounds, SessionFrameBounds, StateKey,
24};
25
26use super::range_utils::{range_diff, range_except};
27
28pub(super) struct WindowBuffer<W: WindowImpl> {
30 window_impl: W,
31 frame_exclusion: FrameExclusion,
32 buffer: VecDeque<Entry<W::Key, W::Value>>,
33 curr_idx: usize,
34 left_idx: usize, right_excl_idx: usize, curr_delta: Option<Vec<(Op, W::Value)>>,
37}
38
39struct Entry<K: Ord, V> {
41 key: K,
42 value: V,
43}
44
45pub(super) struct CurrWindow<'a, K> {
47 pub key: Option<&'a K>,
48
49 #[cfg_attr(not(test), expect(dead_code))]
51 pub preceding_saturated: bool,
53 pub following_saturated: bool,
55}
56
57impl<W: WindowImpl> WindowBuffer<W> {
58 pub fn new(window_impl: W, frame_exclusion: FrameExclusion, enable_delta: bool) -> Self {
59 if enable_delta {
60 assert!(frame_exclusion.is_no_others());
62 }
63
64 Self {
65 window_impl,
66 frame_exclusion,
67 buffer: Default::default(),
68 curr_idx: 0,
69 left_idx: 0,
70 right_excl_idx: 0,
71 curr_delta: if enable_delta {
72 Some(Default::default())
73 } else {
74 None
75 },
76 }
77 }
78
79 pub fn smallest_key(&self) -> Option<&W::Key> {
82 self.buffer.front().map(|Entry { key, .. }| key)
83 }
84
85 pub fn curr_key(&self) -> Option<&W::Key> {
87 self.buffer.get(self.curr_idx).map(|Entry { key, .. }| key)
88 }
89
90 pub fn curr_window(&self) -> CurrWindow<'_, W::Key> {
92 let window = BufferRef {
93 buffer: &self.buffer,
94 curr_idx: self.curr_idx,
95 left_idx: self.left_idx,
96 right_excl_idx: self.right_excl_idx,
97 };
98 CurrWindow {
99 key: self.curr_key(),
100 preceding_saturated: self.window_impl.preceding_saturated(window),
101 following_saturated: self.window_impl.following_saturated(window),
102 }
103 }
104
105 fn curr_window_outer(&self) -> Range<usize> {
106 self.left_idx..std::cmp::max(self.right_excl_idx, self.left_idx)
107 }
108
109 fn curr_window_exclusion(&self) -> Range<usize> {
110 match self.frame_exclusion {
112 FrameExclusion::CurrentRow => self.curr_idx..self.curr_idx + 1,
113 FrameExclusion::NoOthers => self.curr_idx..self.curr_idx,
114 }
115 }
116
117 fn curr_window_ranges(&self) -> (Range<usize>, Range<usize>) {
118 let selection = self.curr_window_outer();
119 let exclusion = self.curr_window_exclusion();
120 range_except(selection, exclusion)
121 }
122
123 pub fn curr_window_values(&self) -> impl DoubleEndedIterator<Item = &W::Value> {
125 assert!(self.left_idx <= self.buffer.len()); assert!(self.right_excl_idx <= self.buffer.len());
127
128 let (left, right) = self.curr_window_ranges();
129 self.buffer
130 .range(left)
131 .chain(self.buffer.range(right))
132 .map(|Entry { value, .. }| value)
133 }
134
135 pub fn consume_curr_window_values_delta(
138 &mut self,
139 ) -> impl Iterator<Item = (Op, W::Value)> + '_ {
140 self.curr_delta
141 .as_mut()
142 .expect("delta mode should be enabled")
143 .drain(..)
144 }
145
146 pub fn append(&mut self, key: W::Key, value: W::Value) {
148 let old_outer = self.curr_window_outer();
149
150 self.buffer.push_back(Entry { key, value });
151 self.recalculate_left_right(RecalculateHint::Append);
152
153 if self.curr_delta.is_some() {
154 self.maintain_delta(old_outer, self.curr_window_outer());
155 }
156 }
157
158 pub fn slide(&mut self) -> impl Iterator<Item = (W::Key, W::Value)> + '_ {
161 let old_outer = self.curr_window_outer();
162
163 self.curr_idx += 1;
164 self.recalculate_left_right(RecalculateHint::Slide);
165
166 if self.curr_delta.is_some() {
167 self.maintain_delta(old_outer, self.curr_window_outer());
168 }
169
170 let min_needed_idx = [self.left_idx, self.curr_idx, self.right_excl_idx]
171 .iter()
172 .min()
173 .copied()
174 .unwrap();
175 self.curr_idx -= min_needed_idx;
176 self.left_idx -= min_needed_idx;
177 self.right_excl_idx -= min_needed_idx;
178
179 self.window_impl.shift_indices(min_needed_idx);
180
181 self.buffer
182 .drain(0..min_needed_idx)
183 .map(|Entry { key, value }| (key, value))
184 }
185
186 fn maintain_delta(&mut self, old_outer: Range<usize>, new_outer: Range<usize>) {
187 debug_assert!(self.frame_exclusion.is_no_others());
188
189 let (outer_removed, outer_added) = range_diff(old_outer.clone(), new_outer.clone());
190 let delta = self.curr_delta.as_mut().unwrap();
191 for idx in outer_removed.iter().cloned().flatten() {
192 delta.push((Op::Delete, self.buffer[idx].value.clone()));
193 }
194 for idx in outer_added.iter().cloned().flatten() {
195 delta.push((Op::Insert, self.buffer[idx].value.clone()));
196 }
197 }
198
199 fn recalculate_left_right(&mut self, hint: RecalculateHint) {
200 let window = BufferRefMut {
201 buffer: &self.buffer,
202 curr_idx: &mut self.curr_idx,
203 left_idx: &mut self.left_idx,
204 right_excl_idx: &mut self.right_excl_idx,
205 };
206 self.window_impl.recalculate_left_right(window, hint);
207 }
208}
209
210#[derive(Educe)]
212#[educe(Clone, Copy)]
213pub(super) struct BufferRef<'a, K: Ord, V: Clone> {
214 buffer: &'a VecDeque<Entry<K, V>>,
215 curr_idx: usize,
216 left_idx: usize,
217 right_excl_idx: usize,
218}
219
220pub(super) struct BufferRefMut<'a, K: Ord, V: Clone> {
222 buffer: &'a VecDeque<Entry<K, V>>,
223 curr_idx: &'a mut usize,
224 left_idx: &'a mut usize,
225 right_excl_idx: &'a mut usize,
226}
227
228#[derive(Clone, Copy, PartialEq, Eq)]
229pub(super) enum RecalculateHint {
230 Append,
231 Slide,
232}
233
234pub(super) trait WindowImpl {
237 type Key: Ord;
238 type Value: Clone;
239
240 fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool;
244
245 fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool;
247
248 fn recalculate_left_right(
251 &mut self,
252 window: BufferRefMut<'_, Self::Key, Self::Value>,
253 hint: RecalculateHint,
254 );
255
256 fn shift_indices(&mut self, n: usize);
259}
260
261pub(super) struct RowsWindow<K: Ord, V: Clone> {
263 frame_bounds: RowsFrameBounds,
264 _phantom: std::marker::PhantomData<K>,
265 _phantom2: std::marker::PhantomData<V>,
266}
267
268impl<K: Ord, V: Clone> RowsWindow<K, V> {
269 pub fn new(frame_bounds: RowsFrameBounds) -> Self {
270 Self {
271 frame_bounds,
272 _phantom: std::marker::PhantomData,
273 _phantom2: std::marker::PhantomData,
274 }
275 }
276}
277
278impl<K: Ord, V: Clone> WindowImpl for RowsWindow<K, V> {
279 type Key = K;
280 type Value = V;
281
282 fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
283 window.curr_idx < window.buffer.len() && {
284 let start_off = self.frame_bounds.start.to_offset();
285 if let Some(start_off) = start_off {
286 if start_off >= 0 {
287 true } else {
289 #[allow(clippy::nonminimal_bool)]
292 {
293 assert!(window.curr_idx >= window.left_idx);
294 }
295 window.curr_idx - window.left_idx >= start_off.unsigned_abs()
296 }
297 } else {
298 false }
300 }
301 }
302
303 fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
304 window.curr_idx < window.buffer.len() && {
305 let end_off = self.frame_bounds.end.to_offset();
306 if let Some(end_off) = end_off {
307 if end_off <= 0 {
308 true } else {
310 #[allow(clippy::nonminimal_bool)]
312 {
313 assert!(window.right_excl_idx > 0);
314 assert!(window.right_excl_idx > window.curr_idx);
315 assert!(window.right_excl_idx <= window.buffer.len());
316 }
317 window.right_excl_idx - 1 - window.curr_idx >= end_off as usize
318 }
319 } else {
320 false }
322 }
323 }
324
325 fn recalculate_left_right(
326 &mut self,
327 window: BufferRefMut<'_, Self::Key, Self::Value>,
328 _hint: RecalculateHint,
329 ) {
330 if window.buffer.is_empty() {
331 *window.left_idx = 0;
332 *window.right_excl_idx = 0;
333 }
334
335 let start_off = self.frame_bounds.start.to_offset();
336 let end_off = self.frame_bounds.end.to_offset();
337 if let Some(start_off) = start_off {
338 let logical_left_idx = *window.curr_idx as isize + start_off;
339 if logical_left_idx >= 0 {
340 *window.left_idx = std::cmp::min(logical_left_idx as usize, window.buffer.len());
341 } else {
342 *window.left_idx = 0;
343 }
344 } else {
345 *window.left_idx = 0;
347 }
348 if let Some(end_off) = end_off {
349 let logical_right_excl_idx = *window.curr_idx as isize + end_off + 1;
350 if logical_right_excl_idx >= 0 {
351 *window.right_excl_idx =
352 std::cmp::min(logical_right_excl_idx as usize, window.buffer.len());
353 } else {
354 *window.right_excl_idx = 0;
355 }
356 } else {
357 *window.right_excl_idx = window.buffer.len();
359 }
360 }
361
362 fn shift_indices(&mut self, _n: usize) {}
363}
364
365pub(super) struct RangeWindow<V: Clone> {
367 frame_bounds: RangeFrameBounds,
368 _phantom: std::marker::PhantomData<V>,
369}
370
371impl<V: Clone> RangeWindow<V> {
372 pub fn new(frame_bounds: RangeFrameBounds) -> Self {
373 Self {
374 frame_bounds,
375 _phantom: std::marker::PhantomData,
376 }
377 }
378}
379
380impl<V: Clone> WindowImpl for RangeWindow<V> {
381 type Key = StateKey;
382 type Value = V;
383
384 fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
385 window.curr_idx < window.buffer.len() && {
386 true
388 }
389 }
390
391 fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
392 window.curr_idx < window.buffer.len()
393 && {
394 window.left_idx < window.buffer.len() - 1
400 }
401 && {
402 window.right_excl_idx < window.buffer.len()
404 }
405 }
406
407 fn recalculate_left_right(
408 &mut self,
409 window: BufferRefMut<'_, Self::Key, Self::Value>,
410 _hint: RecalculateHint,
411 ) {
412 if window.buffer.is_empty() {
413 *window.left_idx = 0;
414 *window.right_excl_idx = 0;
415 }
416
417 let Some(entry) = window.buffer.get(*window.curr_idx) else {
418 return;
422 };
423 let curr_key = &entry.key;
424
425 let curr_order_value = memcmp_encoding::decode_value(
426 &self.frame_bounds.order_data_type,
427 &curr_key.order_key,
428 self.frame_bounds.order_type,
429 )
430 .expect("no reason to fail here because we just encoded it in memory");
431
432 match self.frame_bounds.frame_start_of(&curr_order_value) {
433 Sentinelled::Smallest => {
434 assert_eq!(
436 *window.left_idx, 0,
437 "for unbounded start, left index should always be 0"
438 );
439 }
440 Sentinelled::Normal(value) => {
441 let value_enc = memcmp_encoding::encode_value(value, self.frame_bounds.order_type)
443 .expect("no reason to fail here");
444 *window.left_idx = window
445 .buffer
446 .partition_point(|elem| elem.key.order_key < value_enc);
447 }
448 Sentinelled::Largest => unreachable!("frame start never be UNBOUNDED FOLLOWING"),
449 }
450
451 match self.frame_bounds.frame_end_of(curr_order_value) {
452 Sentinelled::Largest => {
453 *window.right_excl_idx = window.buffer.len();
455 }
456 Sentinelled::Normal(value) => {
457 let value_enc = memcmp_encoding::encode_value(value, self.frame_bounds.order_type)
459 .expect("no reason to fail here");
460 *window.right_excl_idx = window
461 .buffer
462 .partition_point(|elem| elem.key.order_key <= value_enc);
463 }
464 Sentinelled::Smallest => unreachable!("frame end never be UNBOUNDED PRECEDING"),
465 }
466 }
467
468 fn shift_indices(&mut self, _n: usize) {}
469}
470
471pub(super) struct SessionWindow<V: Clone> {
472 frame_bounds: SessionFrameBounds,
473 latest_session: Option<LatestSession>,
475 recognized_session_sizes: VecDeque<usize>,
479 _phantom: std::marker::PhantomData<V>,
480}
481
482#[derive(Debug)]
483struct LatestSession {
484 start_idx: usize,
486
487 minimal_next_start: MemcmpEncoded,
490}
491
492impl<V: Clone> SessionWindow<V> {
493 pub fn new(frame_bounds: SessionFrameBounds) -> Self {
494 Self {
495 frame_bounds,
496 latest_session: None,
497 recognized_session_sizes: Default::default(),
498 _phantom: std::marker::PhantomData,
499 }
500 }
501}
502
503impl<V: Clone> WindowImpl for SessionWindow<V> {
504 type Key = StateKey;
505 type Value = V;
506
507 fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
508 window.curr_idx < window.buffer.len() && {
509 true
511 }
512 }
513
514 fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
515 window.curr_idx < window.buffer.len() && {
516 assert!(window.left_idx <= window.curr_idx);
518 assert!(window.curr_idx < window.right_excl_idx);
519
520 self.latest_session
524 .as_ref()
525 .is_some_and(|LatestSession { start_idx, .. }| window.left_idx < *start_idx)
526 }
527 }
528
529 fn recalculate_left_right(
530 &mut self,
531 window: BufferRefMut<'_, Self::Key, Self::Value>,
532 hint: RecalculateHint,
533 ) {
534 match hint {
543 RecalculateHint::Append => {
544 assert!(!window.buffer.is_empty()); let appended_idx = window.buffer.len() - 1;
546 let appended_key = &window.buffer[appended_idx].key;
547
548 let minimal_next_start_of_appended = self.frame_bounds.minimal_next_start_of(
549 memcmp_encoding::decode_value(
550 &self.frame_bounds.order_data_type,
551 &appended_key.order_key,
552 self.frame_bounds.order_type,
553 )
554 .expect("no reason to fail here because we just encoded it in memory"),
555 );
556 let minimal_next_start_enc_of_appended = memcmp_encoding::encode_value(
557 minimal_next_start_of_appended,
558 self.frame_bounds.order_type,
559 )
560 .expect("no reason to fail here");
561
562 if let Some(&mut LatestSession {
563 ref start_idx,
564 ref mut minimal_next_start,
565 }) = self.latest_session.as_mut()
566 {
567 if &appended_key.order_key >= minimal_next_start {
568 self.recognized_session_sizes
570 .push_back(appended_idx - start_idx);
571 self.latest_session = Some(LatestSession {
572 start_idx: appended_idx,
573 minimal_next_start: minimal_next_start_enc_of_appended,
574 });
575 } else {
578 *minimal_next_start = minimal_next_start_enc_of_appended;
580
581 if *start_idx == *window.left_idx {
582 *window.right_excl_idx = appended_idx + 1;
584 }
585 }
586 } else {
587 let left_idx = *window.left_idx;
589 let curr_idx = *window.curr_idx;
590 let old_right_excl_idx = *window.right_excl_idx;
591 assert_eq!(left_idx, curr_idx);
592 assert_eq!(left_idx, old_right_excl_idx);
593 assert_eq!(old_right_excl_idx, window.buffer.len() - 1);
594
595 *window.right_excl_idx = window.buffer.len();
597
598 self.latest_session = Some(LatestSession {
600 start_idx: left_idx,
601 minimal_next_start: minimal_next_start_enc_of_appended,
602 });
603 }
604 }
605 RecalculateHint::Slide => {
606 let old_left_idx = *window.left_idx;
607 let new_curr_idx = *window.curr_idx;
608 let old_right_excl_idx = *window.right_excl_idx;
609
610 if new_curr_idx < old_right_excl_idx {
611 } else {
613 let old_session_size = self.recognized_session_sizes.pop_front();
614 let next_session_size = self.recognized_session_sizes.front().copied();
615
616 if let Some(old_session_size) = old_session_size {
617 assert_eq!(old_session_size, old_right_excl_idx - old_left_idx);
618
619 if let Some(next_session_size) = next_session_size {
621 *window.left_idx = old_right_excl_idx;
623 *window.right_excl_idx = old_right_excl_idx + next_session_size;
624 } else {
625 *window.left_idx = old_right_excl_idx;
627 *window.right_excl_idx = window.buffer.len();
628 }
629 } else {
630 assert_eq!(old_right_excl_idx, window.buffer.len());
632 *window.left_idx = old_right_excl_idx;
633 *window.right_excl_idx = old_right_excl_idx;
634 self.latest_session = None;
635 }
636 }
637 }
638 }
639 }
640
641 fn shift_indices(&mut self, n: usize) {
642 if let Some(LatestSession { start_idx, .. }) = self.latest_session.as_mut() {
643 *start_idx -= n;
644 }
645 }
646}
647
648#[cfg(test)]
649mod tests {
650 use itertools::Itertools;
651 use risingwave_common::row::OwnedRow;
652 use risingwave_common::types::{DataType, ScalarImpl};
653 use risingwave_common::util::sort_util::OrderType;
654 use risingwave_expr::window_function::FrameBound::{
655 CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding,
656 };
657 use risingwave_expr::window_function::SessionFrameGap;
658
659 use super::*;
660
661 #[test]
662 fn test_rows_frame_unbounded_preceding_to_current_row() {
663 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
664 RowsWindow::new(RowsFrameBounds {
665 start: UnboundedPreceding,
666 end: CurrentRow,
667 }),
668 FrameExclusion::NoOthers,
669 true,
670 );
671
672 let window = buffer.curr_window();
673 assert!(window.key.is_none());
674 assert!(!window.preceding_saturated);
675 assert!(!window.following_saturated);
676 buffer.append(1, "hello");
677 let window = buffer.curr_window();
678 assert_eq!(window.key, Some(&1));
679 assert!(!window.preceding_saturated); assert!(window.following_saturated);
681 buffer.append(2, "world");
682 let window = buffer.curr_window();
683 assert_eq!(window.key, Some(&1));
684 assert!(!window.preceding_saturated);
685 assert!(window.following_saturated);
686 assert_eq!(
687 buffer.curr_window_values().cloned().collect_vec(),
688 vec!["hello"]
689 );
690 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
691 assert!(removed_keys.is_empty()); let window = buffer.curr_window();
693 assert_eq!(window.key, Some(&2));
694 assert!(!window.preceding_saturated);
695 assert!(window.following_saturated);
696 assert_eq!(buffer.smallest_key(), Some(&1));
697 }
698
699 #[test]
700 fn test_rows_frame_preceding_to_current_row() {
701 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
702 RowsWindow::new(RowsFrameBounds {
703 start: Preceding(1),
704 end: CurrentRow,
705 }),
706 FrameExclusion::NoOthers,
707 true,
708 );
709
710 let window = buffer.curr_window();
711 assert!(window.key.is_none());
712 assert!(!window.preceding_saturated);
713 assert!(!window.following_saturated);
714 buffer.append(1, "hello");
715 let window = buffer.curr_window();
716 assert_eq!(window.key, Some(&1));
717 assert!(!window.preceding_saturated);
718 assert!(window.following_saturated);
719 assert_eq!(
720 buffer.curr_window_values().cloned().collect_vec(),
721 vec!["hello"]
722 );
723 buffer.append(2, "world");
724 let window = buffer.curr_window();
725 assert_eq!(window.key, Some(&1));
726 assert!(!window.preceding_saturated);
727 assert!(window.following_saturated);
728 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
729 assert!(removed_keys.is_empty());
730 let window = buffer.curr_window();
731 assert_eq!(window.key, Some(&2));
732 assert!(window.preceding_saturated);
733 assert!(window.following_saturated);
734 assert_eq!(buffer.smallest_key(), Some(&1));
735 buffer.append(3, "!");
736 let window = buffer.curr_window();
737 assert_eq!(window.key, Some(&2));
738 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
739 assert_eq!(removed_keys, vec![1]);
740 }
741
742 #[test]
743 fn test_rows_frame_preceding_to_preceding() {
744 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
745 RowsWindow::new(RowsFrameBounds {
746 start: Preceding(2),
747 end: Preceding(1),
748 }),
749 FrameExclusion::NoOthers,
750 true,
751 );
752
753 buffer.append(1, "RisingWave");
754 let window = buffer.curr_window();
755 assert_eq!(window.key, Some(&1));
756 assert!(!window.preceding_saturated);
757 assert!(window.following_saturated);
758 assert!(buffer.curr_window_values().collect_vec().is_empty());
759 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
760 assert!(removed_keys.is_empty());
761 assert_eq!(buffer.smallest_key(), Some(&1));
762 buffer.append(2, "is the best");
763 let window = buffer.curr_window();
764 assert_eq!(window.key, Some(&2));
765 assert!(!window.preceding_saturated);
766 assert!(window.following_saturated);
767 assert_eq!(
768 buffer.curr_window_values().cloned().collect_vec(),
769 vec!["RisingWave"]
770 );
771 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
772 assert!(removed_keys.is_empty());
773 assert_eq!(buffer.smallest_key(), Some(&1));
774 buffer.append(3, "streaming platform");
775 let window = buffer.curr_window();
776 assert_eq!(window.key, Some(&3));
777 assert!(window.preceding_saturated);
778 assert!(window.following_saturated);
779 assert_eq!(
780 buffer.curr_window_values().cloned().collect_vec(),
781 vec!["RisingWave", "is the best"]
782 );
783 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
784 assert_eq!(removed_keys, vec![1]);
785 assert_eq!(buffer.smallest_key(), Some(&2));
786 }
787
788 #[test]
789 fn test_rows_frame_current_row_to_unbounded_following() {
790 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
791 RowsWindow::new(RowsFrameBounds {
792 start: CurrentRow,
793 end: UnboundedFollowing,
794 }),
795 FrameExclusion::NoOthers,
796 true,
797 );
798
799 buffer.append(1, "RisingWave");
800 let window = buffer.curr_window();
801 assert_eq!(window.key, Some(&1));
802 assert!(window.preceding_saturated);
803 assert!(!window.following_saturated);
804 assert_eq!(
805 buffer.curr_window_values().cloned().collect_vec(),
806 vec!["RisingWave"]
807 );
808 buffer.append(2, "is the best");
809 let window = buffer.curr_window();
810 assert_eq!(window.key, Some(&1));
811 assert!(window.preceding_saturated);
812 assert!(!window.following_saturated);
813 assert_eq!(
814 buffer.curr_window_values().cloned().collect_vec(),
815 vec!["RisingWave", "is the best"]
816 );
817 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
818 assert_eq!(removed_keys, vec![1]);
819 assert_eq!(buffer.smallest_key(), Some(&2));
820 let window = buffer.curr_window();
821 assert_eq!(window.key, Some(&2));
822 assert!(window.preceding_saturated);
823 assert!(!window.following_saturated);
824 assert_eq!(
825 buffer.curr_window_values().cloned().collect_vec(),
826 vec!["is the best"]
827 );
828 }
829
830 #[test]
831 fn test_rows_frame_current_row_to_following() {
832 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
833 RowsWindow::new(RowsFrameBounds {
834 start: CurrentRow,
835 end: Following(1),
836 }),
837 FrameExclusion::NoOthers,
838 true,
839 );
840
841 buffer.append(1, "RisingWave");
842 let window = buffer.curr_window();
843 assert_eq!(window.key, Some(&1));
844 assert!(window.preceding_saturated);
845 assert!(!window.following_saturated);
846 assert_eq!(
847 buffer.curr_window_values().cloned().collect_vec(),
848 vec!["RisingWave"]
849 );
850 buffer.append(2, "is the best");
851 let window = buffer.curr_window();
852 assert_eq!(window.key, Some(&1));
853 assert!(window.preceding_saturated);
854 assert!(window.following_saturated);
855 assert_eq!(
856 buffer.curr_window_values().cloned().collect_vec(),
857 vec!["RisingWave", "is the best"]
858 );
859 buffer.append(3, "streaming platform");
860 let window = buffer.curr_window();
861 assert_eq!(window.key, Some(&1));
862 assert!(window.preceding_saturated);
863 assert!(window.following_saturated);
864 assert_eq!(
865 buffer.curr_window_values().cloned().collect_vec(),
866 vec!["RisingWave", "is the best"]
867 );
868 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
869 assert_eq!(removed_keys, vec![1]);
870 let window = buffer.curr_window();
871 assert_eq!(window.key, Some(&2));
872 assert!(window.preceding_saturated);
873 assert!(window.following_saturated);
874 assert_eq!(
875 buffer.curr_window_values().cloned().collect_vec(),
876 vec!["is the best", "streaming platform"]
877 );
878 }
879
880 #[test]
881 fn test_rows_frame_following_to_following() {
882 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
883 RowsWindow::new(RowsFrameBounds {
884 start: Following(1),
885 end: Following(2),
886 }),
887 FrameExclusion::NoOthers,
888 true,
889 );
890
891 buffer.append(1, "RisingWave");
892 let window = buffer.curr_window();
893 assert_eq!(window.key, Some(&1));
894 assert!(window.preceding_saturated);
895 assert!(!window.following_saturated);
896 assert!(buffer.curr_window_values().collect_vec().is_empty());
897 buffer.append(2, "is the best");
898 let window = buffer.curr_window();
899 assert_eq!(window.key, Some(&1));
900 assert!(window.preceding_saturated);
901 assert!(!window.following_saturated);
902 assert_eq!(
903 buffer.curr_window_values().cloned().collect_vec(),
904 vec!["is the best"]
905 );
906 buffer.append(3, "streaming platform");
907 let window = buffer.curr_window();
908 assert_eq!(window.key, Some(&1));
909 assert!(window.preceding_saturated);
910 assert!(window.following_saturated);
911 assert_eq!(
912 buffer.curr_window_values().cloned().collect_vec(),
913 vec!["is the best", "streaming platform"]
914 );
915 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
916 assert_eq!(removed_keys, vec![1]);
917 let window = buffer.curr_window();
918 assert_eq!(window.key, Some(&2));
919 assert!(window.preceding_saturated);
920 assert!(!window.following_saturated);
921 assert_eq!(
922 buffer.curr_window_values().cloned().collect_vec(),
923 vec!["streaming platform"]
924 );
925 }
926
927 #[test]
928 fn test_rows_frame_exclude_current_row() {
929 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
930 RowsWindow::new(RowsFrameBounds {
931 start: UnboundedPreceding,
932 end: CurrentRow,
933 }),
934 FrameExclusion::CurrentRow,
935 false,
936 );
937
938 buffer.append(1, "hello");
939 assert!(
940 buffer
941 .curr_window_values()
942 .cloned()
943 .collect_vec()
944 .is_empty()
945 );
946 buffer.append(2, "world");
947 let _ = buffer.slide();
948 assert_eq!(
949 buffer.curr_window_values().cloned().collect_vec(),
950 vec!["hello"]
951 );
952 }
953
954 #[test]
955 fn test_session_frame() {
956 let order_data_type = DataType::Int64;
957 let order_type = OrderType::ascending();
958 let gap_data_type = DataType::Int64;
959
960 let mut buffer = WindowBuffer::<SessionWindow<_>>::new(
961 SessionWindow::new(SessionFrameBounds {
962 order_data_type: order_data_type.clone(),
963 order_type,
964 gap_data_type: gap_data_type.clone(),
965 gap: SessionFrameGap::new_for_test(
966 ScalarImpl::Int64(5),
967 &order_data_type,
968 &gap_data_type,
969 ),
970 }),
971 FrameExclusion::NoOthers,
972 true,
973 );
974
975 let key = |key: i64| -> StateKey {
976 StateKey {
977 order_key: memcmp_encoding::encode_value(Some(ScalarImpl::from(key)), order_type)
978 .unwrap(),
979 pk: OwnedRow::empty().into(),
980 }
981 };
982
983 assert!(buffer.curr_key().is_none());
984
985 buffer.append(key(1), "hello");
986 buffer.append(key(3), "session");
987 let window = buffer.curr_window();
988 assert_eq!(window.key, Some(&key(1)));
989 assert!(window.preceding_saturated);
990 assert!(!window.following_saturated);
991 assert_eq!(
992 buffer.curr_window_values().cloned().collect_vec(),
993 vec!["hello", "session"]
994 );
995
996 buffer.append(key(8), "window"); let window = buffer.curr_window();
998 assert!(window.following_saturated);
999 assert_eq!(
1000 buffer.curr_window_values().cloned().collect_vec(),
1001 vec!["hello", "session"]
1002 );
1003
1004 buffer.append(key(15), "and");
1005 buffer.append(key(16), "world");
1006 assert_eq!(
1007 buffer.curr_window_values().cloned().collect_vec(),
1008 vec!["hello", "session"]
1009 );
1010
1011 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1012 assert!(removed_keys.is_empty());
1013 let window = buffer.curr_window();
1014 assert_eq!(window.key, Some(&key(3)));
1015 assert!(window.preceding_saturated);
1016 assert!(window.following_saturated);
1017 assert_eq!(
1018 buffer.curr_window_values().cloned().collect_vec(),
1019 vec!["hello", "session"]
1020 );
1021
1022 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1023 assert_eq!(removed_keys, vec![key(1), key(3)]);
1024 assert_eq!(buffer.smallest_key(), Some(&key(8)));
1025 let window = buffer.curr_window();
1026 assert_eq!(window.key, Some(&key(8)));
1027 assert!(window.preceding_saturated);
1028 assert!(window.following_saturated);
1029 assert_eq!(
1030 buffer.curr_window_values().cloned().collect_vec(),
1031 vec!["window"]
1032 );
1033
1034 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1035 assert_eq!(removed_keys, vec![key(8)]);
1036 assert_eq!(buffer.smallest_key(), Some(&key(15)));
1037 let window = buffer.curr_window();
1038 assert_eq!(window.key, Some(&key(15)));
1039 assert!(window.preceding_saturated);
1040 assert!(!window.following_saturated);
1041 assert_eq!(
1042 buffer.curr_window_values().cloned().collect_vec(),
1043 vec!["and", "world"]
1044 );
1045
1046 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1047 assert!(removed_keys.is_empty());
1048 assert_eq!(buffer.curr_key(), Some(&key(16)));
1049 assert_eq!(
1050 buffer.curr_window_values().cloned().collect_vec(),
1051 vec!["and", "world"]
1052 );
1053
1054 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1055 assert_eq!(removed_keys, vec![key(15), key(16)]);
1056 assert!(buffer.curr_key().is_none());
1057 assert!(
1058 buffer
1059 .curr_window_values()
1060 .cloned()
1061 .collect_vec()
1062 .is_empty()
1063 );
1064 }
1065}