1use std::collections::VecDeque;
16use std::ops::Range;
17
18use educe::Educe;
19use risingwave_common::array::Op;
20use risingwave_common::types::Sentinelled;
21use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
22use risingwave_expr::window_function::{
23 FrameExclusion, RangeFrameBounds, RowsFrameBounds, SessionFrameBounds, StateKey,
24};
25
26use super::range_utils::{range_diff, range_except};
27
28pub(super) struct WindowBuffer<W: WindowImpl> {
30 window_impl: W,
31 frame_exclusion: FrameExclusion,
32 buffer: VecDeque<Entry<W::Key, W::Value>>,
33 curr_idx: usize,
34 left_idx: usize, right_excl_idx: usize, curr_delta: Option<Vec<(Op, W::Value)>>,
37}
38
39struct Entry<K: Ord, V> {
41 key: K,
42 value: V,
43}
44
45pub(super) struct CurrWindow<'a, K> {
47 pub key: Option<&'a K>,
48
49 #[cfg_attr(not(test), expect(dead_code))]
51 pub preceding_saturated: bool,
53 pub following_saturated: bool,
55}
56
57impl<W: WindowImpl> WindowBuffer<W> {
58 pub fn new(window_impl: W, frame_exclusion: FrameExclusion, enable_delta: bool) -> Self {
59 if enable_delta {
60 assert!(frame_exclusion.is_no_others());
62 }
63
64 Self {
65 window_impl,
66 frame_exclusion,
67 buffer: Default::default(),
68 curr_idx: 0,
69 left_idx: 0,
70 right_excl_idx: 0,
71 curr_delta: if enable_delta {
72 Some(Default::default())
73 } else {
74 None
75 },
76 }
77 }
78
79 pub fn smallest_key(&self) -> Option<&W::Key> {
82 self.buffer.front().map(|Entry { key, .. }| key)
83 }
84
85 pub fn curr_key(&self) -> Option<&W::Key> {
87 self.buffer.get(self.curr_idx).map(|Entry { key, .. }| key)
88 }
89
90 pub fn curr_window(&self) -> CurrWindow<'_, W::Key> {
92 let window = BufferRef {
93 buffer: &self.buffer,
94 curr_idx: self.curr_idx,
95 left_idx: self.left_idx,
96 right_excl_idx: self.right_excl_idx,
97 };
98 CurrWindow {
99 key: self.curr_key(),
100 preceding_saturated: self.window_impl.preceding_saturated(window),
101 following_saturated: self.window_impl.following_saturated(window),
102 }
103 }
104
105 fn curr_window_outer(&self) -> Range<usize> {
106 self.left_idx..self.right_excl_idx
107 }
108
109 fn curr_window_exclusion(&self) -> Range<usize> {
110 match self.frame_exclusion {
112 FrameExclusion::CurrentRow => self.curr_idx..self.curr_idx + 1,
113 FrameExclusion::NoOthers => self.curr_idx..self.curr_idx,
114 }
115 }
116
117 fn curr_window_ranges(&self) -> (Range<usize>, Range<usize>) {
118 let selection = self.curr_window_outer();
119 let exclusion = self.curr_window_exclusion();
120 range_except(selection, exclusion)
121 }
122
123 pub fn curr_window_values(&self) -> impl DoubleEndedIterator<Item = &W::Value> {
125 assert!(self.left_idx <= self.right_excl_idx);
126 assert!(self.right_excl_idx <= self.buffer.len());
127
128 let (left, right) = self.curr_window_ranges();
129 self.buffer
130 .range(left)
131 .chain(self.buffer.range(right))
132 .map(|Entry { value, .. }| value)
133 }
134
135 pub fn consume_curr_window_values_delta(
138 &mut self,
139 ) -> impl Iterator<Item = (Op, W::Value)> + '_ {
140 self.curr_delta
141 .as_mut()
142 .expect("delta mode should be enabled")
143 .drain(..)
144 }
145
146 pub fn append(&mut self, key: W::Key, value: W::Value) {
148 let old_outer = self.curr_window_outer();
149
150 self.buffer.push_back(Entry { key, value });
151 self.recalculate_left_right(RecalculateHint::Append);
152
153 if self.curr_delta.is_some() {
154 self.maintain_delta(old_outer, self.curr_window_outer());
155 }
156 }
157
158 pub fn slide(&mut self) -> impl Iterator<Item = (W::Key, W::Value)> + '_ {
161 let old_outer = self.curr_window_outer();
162
163 self.curr_idx += 1;
164 self.recalculate_left_right(RecalculateHint::Slide);
165
166 if self.curr_delta.is_some() {
167 self.maintain_delta(old_outer, self.curr_window_outer());
168 }
169
170 let min_needed_idx = std::cmp::min(self.left_idx, self.curr_idx);
171 self.curr_idx -= min_needed_idx;
172 self.left_idx -= min_needed_idx;
173 self.right_excl_idx -= min_needed_idx;
174
175 self.window_impl.shift_indices(min_needed_idx);
176
177 self.buffer
178 .drain(0..min_needed_idx)
179 .map(|Entry { key, value }| (key, value))
180 }
181
182 fn maintain_delta(&mut self, old_outer: Range<usize>, new_outer: Range<usize>) {
183 debug_assert!(self.frame_exclusion.is_no_others());
184
185 let (outer_removed, outer_added) = range_diff(old_outer.clone(), new_outer.clone());
186 let delta = self.curr_delta.as_mut().unwrap();
187 for idx in outer_removed.iter().cloned().flatten() {
188 delta.push((Op::Delete, self.buffer[idx].value.clone()));
189 }
190 for idx in outer_added.iter().cloned().flatten() {
191 delta.push((Op::Insert, self.buffer[idx].value.clone()));
192 }
193 }
194
195 fn recalculate_left_right(&mut self, hint: RecalculateHint) {
196 let window = BufferRefMut {
197 buffer: &self.buffer,
198 curr_idx: &mut self.curr_idx,
199 left_idx: &mut self.left_idx,
200 right_excl_idx: &mut self.right_excl_idx,
201 };
202 self.window_impl.recalculate_left_right(window, hint);
203 }
204}
205
206#[derive(Educe)]
208#[educe(Clone, Copy)]
209pub(super) struct BufferRef<'a, K: Ord, V: Clone> {
210 buffer: &'a VecDeque<Entry<K, V>>,
211 curr_idx: usize,
212 left_idx: usize,
213 right_excl_idx: usize,
214}
215
216pub(super) struct BufferRefMut<'a, K: Ord, V: Clone> {
218 buffer: &'a VecDeque<Entry<K, V>>,
219 curr_idx: &'a mut usize,
220 left_idx: &'a mut usize,
221 right_excl_idx: &'a mut usize,
222}
223
224#[derive(Clone, Copy, PartialEq, Eq)]
225pub(super) enum RecalculateHint {
226 Append,
227 Slide,
228}
229
230pub(super) trait WindowImpl {
233 type Key: Ord;
234 type Value: Clone;
235
236 fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool;
240
241 fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool;
243
244 fn recalculate_left_right(
247 &mut self,
248 window: BufferRefMut<'_, Self::Key, Self::Value>,
249 hint: RecalculateHint,
250 );
251
252 fn shift_indices(&mut self, n: usize);
255}
256
257pub(super) struct RowsWindow<K: Ord, V: Clone> {
259 frame_bounds: RowsFrameBounds,
260 _phantom: std::marker::PhantomData<K>,
261 _phantom2: std::marker::PhantomData<V>,
262}
263
264impl<K: Ord, V: Clone> RowsWindow<K, V> {
265 pub fn new(frame_bounds: RowsFrameBounds) -> Self {
266 Self {
267 frame_bounds,
268 _phantom: std::marker::PhantomData,
269 _phantom2: std::marker::PhantomData,
270 }
271 }
272}
273
274impl<K: Ord, V: Clone> WindowImpl for RowsWindow<K, V> {
275 type Key = K;
276 type Value = V;
277
278 fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
279 window.curr_idx < window.buffer.len() && {
280 let start_off = self.frame_bounds.start.to_offset();
281 if let Some(start_off) = start_off {
282 if start_off >= 0 {
283 true } else {
285 #[allow(clippy::nonminimal_bool)]
288 {
289 assert!(window.curr_idx >= window.left_idx);
290 }
291 window.curr_idx - window.left_idx >= start_off.unsigned_abs()
292 }
293 } else {
294 false }
296 }
297 }
298
299 fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
300 window.curr_idx < window.buffer.len() && {
301 let end_off = self.frame_bounds.end.to_offset();
302 if let Some(end_off) = end_off {
303 if end_off <= 0 {
304 true } else {
306 #[allow(clippy::nonminimal_bool)]
308 {
309 assert!(window.right_excl_idx > 0);
310 assert!(window.right_excl_idx > window.curr_idx);
311 assert!(window.right_excl_idx <= window.buffer.len());
312 }
313 window.right_excl_idx - 1 - window.curr_idx >= end_off as usize
314 }
315 } else {
316 false }
318 }
319 }
320
321 fn recalculate_left_right(
322 &mut self,
323 window: BufferRefMut<'_, Self::Key, Self::Value>,
324 _hint: RecalculateHint,
325 ) {
326 if window.buffer.is_empty() {
327 *window.left_idx = 0;
328 *window.right_excl_idx = 0;
329 }
330
331 let start_off = self.frame_bounds.start.to_offset();
332 let end_off = self.frame_bounds.end.to_offset();
333 if let Some(start_off) = start_off {
334 let logical_left_idx = *window.curr_idx as isize + start_off;
335 if logical_left_idx >= 0 {
336 *window.left_idx = std::cmp::min(logical_left_idx as usize, window.buffer.len());
337 } else {
338 *window.left_idx = 0;
339 }
340 } else {
341 *window.left_idx = 0;
343 }
344 if let Some(end_off) = end_off {
345 let logical_right_excl_idx = *window.curr_idx as isize + end_off + 1;
346 if logical_right_excl_idx >= 0 {
347 *window.right_excl_idx =
348 std::cmp::min(logical_right_excl_idx as usize, window.buffer.len());
349 } else {
350 *window.right_excl_idx = 0;
351 }
352 } else {
353 *window.right_excl_idx = window.buffer.len();
355 }
356 }
357
358 fn shift_indices(&mut self, _n: usize) {}
359}
360
361pub(super) struct RangeWindow<V: Clone> {
363 frame_bounds: RangeFrameBounds,
364 _phantom: std::marker::PhantomData<V>,
365}
366
367impl<V: Clone> RangeWindow<V> {
368 pub fn new(frame_bounds: RangeFrameBounds) -> Self {
369 Self {
370 frame_bounds,
371 _phantom: std::marker::PhantomData,
372 }
373 }
374}
375
376impl<V: Clone> WindowImpl for RangeWindow<V> {
377 type Key = StateKey;
378 type Value = V;
379
380 fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
381 window.curr_idx < window.buffer.len() && {
382 true
384 }
385 }
386
387 fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
388 window.curr_idx < window.buffer.len()
389 && {
390 window.left_idx < window.buffer.len() - 1
396 }
397 && {
398 window.right_excl_idx < window.buffer.len()
400 }
401 }
402
403 fn recalculate_left_right(
404 &mut self,
405 window: BufferRefMut<'_, Self::Key, Self::Value>,
406 _hint: RecalculateHint,
407 ) {
408 if window.buffer.is_empty() {
409 *window.left_idx = 0;
410 *window.right_excl_idx = 0;
411 }
412
413 let Some(entry) = window.buffer.get(*window.curr_idx) else {
414 return;
418 };
419 let curr_key = &entry.key;
420
421 let curr_order_value = memcmp_encoding::decode_value(
422 &self.frame_bounds.order_data_type,
423 &curr_key.order_key,
424 self.frame_bounds.order_type,
425 )
426 .expect("no reason to fail here because we just encoded it in memory");
427
428 match self.frame_bounds.frame_start_of(&curr_order_value) {
429 Sentinelled::Smallest => {
430 assert_eq!(
432 *window.left_idx, 0,
433 "for unbounded start, left index should always be 0"
434 );
435 }
436 Sentinelled::Normal(value) => {
437 let value_enc = memcmp_encoding::encode_value(value, self.frame_bounds.order_type)
439 .expect("no reason to fail here");
440 *window.left_idx = window
441 .buffer
442 .partition_point(|elem| elem.key.order_key < value_enc);
443 }
444 Sentinelled::Largest => unreachable!("frame start never be UNBOUNDED FOLLOWING"),
445 }
446
447 match self.frame_bounds.frame_end_of(curr_order_value) {
448 Sentinelled::Largest => {
449 *window.right_excl_idx = window.buffer.len();
451 }
452 Sentinelled::Normal(value) => {
453 let value_enc = memcmp_encoding::encode_value(value, self.frame_bounds.order_type)
455 .expect("no reason to fail here");
456 *window.right_excl_idx = window
457 .buffer
458 .partition_point(|elem| elem.key.order_key <= value_enc);
459 }
460 Sentinelled::Smallest => unreachable!("frame end never be UNBOUNDED PRECEDING"),
461 }
462 }
463
464 fn shift_indices(&mut self, _n: usize) {}
465}
466
467pub(super) struct SessionWindow<V: Clone> {
468 frame_bounds: SessionFrameBounds,
469 latest_session: Option<LatestSession>,
471 recognized_session_sizes: VecDeque<usize>,
475 _phantom: std::marker::PhantomData<V>,
476}
477
478#[derive(Debug)]
479struct LatestSession {
480 start_idx: usize,
482
483 minimal_next_start: MemcmpEncoded,
486}
487
488impl<V: Clone> SessionWindow<V> {
489 pub fn new(frame_bounds: SessionFrameBounds) -> Self {
490 Self {
491 frame_bounds,
492 latest_session: None,
493 recognized_session_sizes: Default::default(),
494 _phantom: std::marker::PhantomData,
495 }
496 }
497}
498
499impl<V: Clone> WindowImpl for SessionWindow<V> {
500 type Key = StateKey;
501 type Value = V;
502
503 fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
504 window.curr_idx < window.buffer.len() && {
505 true
507 }
508 }
509
510 fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
511 window.curr_idx < window.buffer.len() && {
512 assert!(window.left_idx <= window.curr_idx);
514 assert!(window.curr_idx < window.right_excl_idx);
515
516 self.latest_session
520 .as_ref()
521 .is_some_and(|LatestSession { start_idx, .. }| window.left_idx < *start_idx)
522 }
523 }
524
525 fn recalculate_left_right(
526 &mut self,
527 window: BufferRefMut<'_, Self::Key, Self::Value>,
528 hint: RecalculateHint,
529 ) {
530 match hint {
539 RecalculateHint::Append => {
540 assert!(!window.buffer.is_empty()); let appended_idx = window.buffer.len() - 1;
542 let appended_key = &window.buffer[appended_idx].key;
543
544 let minimal_next_start_of_appended = self.frame_bounds.minimal_next_start_of(
545 memcmp_encoding::decode_value(
546 &self.frame_bounds.order_data_type,
547 &appended_key.order_key,
548 self.frame_bounds.order_type,
549 )
550 .expect("no reason to fail here because we just encoded it in memory"),
551 );
552 let minimal_next_start_enc_of_appended = memcmp_encoding::encode_value(
553 minimal_next_start_of_appended,
554 self.frame_bounds.order_type,
555 )
556 .expect("no reason to fail here");
557
558 if let Some(&mut LatestSession {
559 ref start_idx,
560 ref mut minimal_next_start,
561 }) = self.latest_session.as_mut()
562 {
563 if &appended_key.order_key >= minimal_next_start {
564 self.recognized_session_sizes
566 .push_back(appended_idx - start_idx);
567 self.latest_session = Some(LatestSession {
568 start_idx: appended_idx,
569 minimal_next_start: minimal_next_start_enc_of_appended,
570 });
571 } else {
574 *minimal_next_start = minimal_next_start_enc_of_appended;
576
577 if *start_idx == *window.left_idx {
578 *window.right_excl_idx = appended_idx + 1;
580 }
581 }
582 } else {
583 let left_idx = *window.left_idx;
585 let curr_idx = *window.curr_idx;
586 let old_right_excl_idx = *window.right_excl_idx;
587 assert_eq!(left_idx, curr_idx);
588 assert_eq!(left_idx, old_right_excl_idx);
589 assert_eq!(old_right_excl_idx, window.buffer.len() - 1);
590
591 *window.right_excl_idx = window.buffer.len();
593
594 self.latest_session = Some(LatestSession {
596 start_idx: left_idx,
597 minimal_next_start: minimal_next_start_enc_of_appended,
598 });
599 }
600 }
601 RecalculateHint::Slide => {
602 let old_left_idx = *window.left_idx;
603 let new_curr_idx = *window.curr_idx;
604 let old_right_excl_idx = *window.right_excl_idx;
605
606 if new_curr_idx < old_right_excl_idx {
607 } else {
609 let old_session_size = self.recognized_session_sizes.pop_front();
610 let next_session_size = self.recognized_session_sizes.front().copied();
611
612 if let Some(old_session_size) = old_session_size {
613 assert_eq!(old_session_size, old_right_excl_idx - old_left_idx);
614
615 if let Some(next_session_size) = next_session_size {
617 *window.left_idx = old_right_excl_idx;
619 *window.right_excl_idx = old_right_excl_idx + next_session_size;
620 } else {
621 *window.left_idx = old_right_excl_idx;
623 *window.right_excl_idx = window.buffer.len();
624 }
625 } else {
626 assert_eq!(old_right_excl_idx, window.buffer.len());
628 *window.left_idx = old_right_excl_idx;
629 *window.right_excl_idx = old_right_excl_idx;
630 self.latest_session = None;
631 }
632 }
633 }
634 }
635 }
636
637 fn shift_indices(&mut self, n: usize) {
638 if let Some(LatestSession { start_idx, .. }) = self.latest_session.as_mut() {
639 *start_idx -= n;
640 }
641 }
642}
643
644#[cfg(test)]
645mod tests {
646 use itertools::Itertools;
647 use risingwave_common::row::OwnedRow;
648 use risingwave_common::types::{DataType, ScalarImpl};
649 use risingwave_common::util::sort_util::OrderType;
650 use risingwave_expr::window_function::FrameBound::{
651 CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding,
652 };
653 use risingwave_expr::window_function::SessionFrameGap;
654
655 use super::*;
656
657 #[test]
658 fn test_rows_frame_unbounded_preceding_to_current_row() {
659 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
660 RowsWindow::new(RowsFrameBounds {
661 start: UnboundedPreceding,
662 end: CurrentRow,
663 }),
664 FrameExclusion::NoOthers,
665 true,
666 );
667
668 let window = buffer.curr_window();
669 assert!(window.key.is_none());
670 assert!(!window.preceding_saturated);
671 assert!(!window.following_saturated);
672 buffer.append(1, "hello");
673 let window = buffer.curr_window();
674 assert_eq!(window.key, Some(&1));
675 assert!(!window.preceding_saturated); assert!(window.following_saturated);
677 buffer.append(2, "world");
678 let window = buffer.curr_window();
679 assert_eq!(window.key, Some(&1));
680 assert!(!window.preceding_saturated);
681 assert!(window.following_saturated);
682 assert_eq!(
683 buffer.curr_window_values().cloned().collect_vec(),
684 vec!["hello"]
685 );
686 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
687 assert!(removed_keys.is_empty()); let window = buffer.curr_window();
689 assert_eq!(window.key, Some(&2));
690 assert!(!window.preceding_saturated);
691 assert!(window.following_saturated);
692 assert_eq!(buffer.smallest_key(), Some(&1));
693 }
694
695 #[test]
696 fn test_rows_frame_preceding_to_current_row() {
697 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
698 RowsWindow::new(RowsFrameBounds {
699 start: Preceding(1),
700 end: CurrentRow,
701 }),
702 FrameExclusion::NoOthers,
703 true,
704 );
705
706 let window = buffer.curr_window();
707 assert!(window.key.is_none());
708 assert!(!window.preceding_saturated);
709 assert!(!window.following_saturated);
710 buffer.append(1, "hello");
711 let window = buffer.curr_window();
712 assert_eq!(window.key, Some(&1));
713 assert!(!window.preceding_saturated);
714 assert!(window.following_saturated);
715 assert_eq!(
716 buffer.curr_window_values().cloned().collect_vec(),
717 vec!["hello"]
718 );
719 buffer.append(2, "world");
720 let window = buffer.curr_window();
721 assert_eq!(window.key, Some(&1));
722 assert!(!window.preceding_saturated);
723 assert!(window.following_saturated);
724 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
725 assert!(removed_keys.is_empty());
726 let window = buffer.curr_window();
727 assert_eq!(window.key, Some(&2));
728 assert!(window.preceding_saturated);
729 assert!(window.following_saturated);
730 assert_eq!(buffer.smallest_key(), Some(&1));
731 buffer.append(3, "!");
732 let window = buffer.curr_window();
733 assert_eq!(window.key, Some(&2));
734 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
735 assert_eq!(removed_keys, vec![1]);
736 }
737
738 #[test]
739 fn test_rows_frame_preceding_to_preceding() {
740 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
741 RowsWindow::new(RowsFrameBounds {
742 start: Preceding(2),
743 end: Preceding(1),
744 }),
745 FrameExclusion::NoOthers,
746 true,
747 );
748
749 buffer.append(1, "RisingWave");
750 let window = buffer.curr_window();
751 assert_eq!(window.key, Some(&1));
752 assert!(!window.preceding_saturated);
753 assert!(window.following_saturated);
754 assert!(buffer.curr_window_values().collect_vec().is_empty());
755 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
756 assert!(removed_keys.is_empty());
757 assert_eq!(buffer.smallest_key(), Some(&1));
758 buffer.append(2, "is the best");
759 let window = buffer.curr_window();
760 assert_eq!(window.key, Some(&2));
761 assert!(!window.preceding_saturated);
762 assert!(window.following_saturated);
763 assert_eq!(
764 buffer.curr_window_values().cloned().collect_vec(),
765 vec!["RisingWave"]
766 );
767 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
768 assert!(removed_keys.is_empty());
769 assert_eq!(buffer.smallest_key(), Some(&1));
770 buffer.append(3, "streaming platform");
771 let window = buffer.curr_window();
772 assert_eq!(window.key, Some(&3));
773 assert!(window.preceding_saturated);
774 assert!(window.following_saturated);
775 assert_eq!(
776 buffer.curr_window_values().cloned().collect_vec(),
777 vec!["RisingWave", "is the best"]
778 );
779 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
780 assert_eq!(removed_keys, vec![1]);
781 assert_eq!(buffer.smallest_key(), Some(&2));
782 }
783
784 #[test]
785 fn test_rows_frame_current_row_to_unbounded_following() {
786 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
787 RowsWindow::new(RowsFrameBounds {
788 start: CurrentRow,
789 end: UnboundedFollowing,
790 }),
791 FrameExclusion::NoOthers,
792 true,
793 );
794
795 buffer.append(1, "RisingWave");
796 let window = buffer.curr_window();
797 assert_eq!(window.key, Some(&1));
798 assert!(window.preceding_saturated);
799 assert!(!window.following_saturated);
800 assert_eq!(
801 buffer.curr_window_values().cloned().collect_vec(),
802 vec!["RisingWave"]
803 );
804 buffer.append(2, "is the best");
805 let window = buffer.curr_window();
806 assert_eq!(window.key, Some(&1));
807 assert!(window.preceding_saturated);
808 assert!(!window.following_saturated);
809 assert_eq!(
810 buffer.curr_window_values().cloned().collect_vec(),
811 vec!["RisingWave", "is the best"]
812 );
813 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
814 assert_eq!(removed_keys, vec![1]);
815 assert_eq!(buffer.smallest_key(), Some(&2));
816 let window = buffer.curr_window();
817 assert_eq!(window.key, Some(&2));
818 assert!(window.preceding_saturated);
819 assert!(!window.following_saturated);
820 assert_eq!(
821 buffer.curr_window_values().cloned().collect_vec(),
822 vec!["is the best"]
823 );
824 }
825
826 #[test]
827 fn test_rows_frame_current_row_to_following() {
828 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
829 RowsWindow::new(RowsFrameBounds {
830 start: CurrentRow,
831 end: Following(1),
832 }),
833 FrameExclusion::NoOthers,
834 true,
835 );
836
837 buffer.append(1, "RisingWave");
838 let window = buffer.curr_window();
839 assert_eq!(window.key, Some(&1));
840 assert!(window.preceding_saturated);
841 assert!(!window.following_saturated);
842 assert_eq!(
843 buffer.curr_window_values().cloned().collect_vec(),
844 vec!["RisingWave"]
845 );
846 buffer.append(2, "is the best");
847 let window = buffer.curr_window();
848 assert_eq!(window.key, Some(&1));
849 assert!(window.preceding_saturated);
850 assert!(window.following_saturated);
851 assert_eq!(
852 buffer.curr_window_values().cloned().collect_vec(),
853 vec!["RisingWave", "is the best"]
854 );
855 buffer.append(3, "streaming platform");
856 let window = buffer.curr_window();
857 assert_eq!(window.key, Some(&1));
858 assert!(window.preceding_saturated);
859 assert!(window.following_saturated);
860 assert_eq!(
861 buffer.curr_window_values().cloned().collect_vec(),
862 vec!["RisingWave", "is the best"]
863 );
864 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
865 assert_eq!(removed_keys, vec![1]);
866 let window = buffer.curr_window();
867 assert_eq!(window.key, Some(&2));
868 assert!(window.preceding_saturated);
869 assert!(window.following_saturated);
870 assert_eq!(
871 buffer.curr_window_values().cloned().collect_vec(),
872 vec!["is the best", "streaming platform"]
873 );
874 }
875
876 #[test]
877 fn test_rows_frame_following_to_following() {
878 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
879 RowsWindow::new(RowsFrameBounds {
880 start: Following(1),
881 end: Following(2),
882 }),
883 FrameExclusion::NoOthers,
884 true,
885 );
886
887 buffer.append(1, "RisingWave");
888 let window = buffer.curr_window();
889 assert_eq!(window.key, Some(&1));
890 assert!(window.preceding_saturated);
891 assert!(!window.following_saturated);
892 assert!(buffer.curr_window_values().collect_vec().is_empty());
893 buffer.append(2, "is the best");
894 let window = buffer.curr_window();
895 assert_eq!(window.key, Some(&1));
896 assert!(window.preceding_saturated);
897 assert!(!window.following_saturated);
898 assert_eq!(
899 buffer.curr_window_values().cloned().collect_vec(),
900 vec!["is the best"]
901 );
902 buffer.append(3, "streaming platform");
903 let window = buffer.curr_window();
904 assert_eq!(window.key, Some(&1));
905 assert!(window.preceding_saturated);
906 assert!(window.following_saturated);
907 assert_eq!(
908 buffer.curr_window_values().cloned().collect_vec(),
909 vec!["is the best", "streaming platform"]
910 );
911 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
912 assert_eq!(removed_keys, vec![1]);
913 let window = buffer.curr_window();
914 assert_eq!(window.key, Some(&2));
915 assert!(window.preceding_saturated);
916 assert!(!window.following_saturated);
917 assert_eq!(
918 buffer.curr_window_values().cloned().collect_vec(),
919 vec!["streaming platform"]
920 );
921 }
922
923 #[test]
924 fn test_rows_frame_exclude_current_row() {
925 let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
926 RowsWindow::new(RowsFrameBounds {
927 start: UnboundedPreceding,
928 end: CurrentRow,
929 }),
930 FrameExclusion::CurrentRow,
931 false,
932 );
933
934 buffer.append(1, "hello");
935 assert!(
936 buffer
937 .curr_window_values()
938 .cloned()
939 .collect_vec()
940 .is_empty()
941 );
942 buffer.append(2, "world");
943 let _ = buffer.slide();
944 assert_eq!(
945 buffer.curr_window_values().cloned().collect_vec(),
946 vec!["hello"]
947 );
948 }
949
950 #[test]
951 fn test_session_frame() {
952 let order_data_type = DataType::Int64;
953 let order_type = OrderType::ascending();
954 let gap_data_type = DataType::Int64;
955
956 let mut buffer = WindowBuffer::<SessionWindow<_>>::new(
957 SessionWindow::new(SessionFrameBounds {
958 order_data_type: order_data_type.clone(),
959 order_type,
960 gap_data_type: gap_data_type.clone(),
961 gap: SessionFrameGap::new_for_test(
962 ScalarImpl::Int64(5),
963 &order_data_type,
964 &gap_data_type,
965 ),
966 }),
967 FrameExclusion::NoOthers,
968 true,
969 );
970
971 let key = |key: i64| -> StateKey {
972 StateKey {
973 order_key: memcmp_encoding::encode_value(Some(ScalarImpl::from(key)), order_type)
974 .unwrap(),
975 pk: OwnedRow::empty().into(),
976 }
977 };
978
979 assert!(buffer.curr_key().is_none());
980
981 buffer.append(key(1), "hello");
982 buffer.append(key(3), "session");
983 let window = buffer.curr_window();
984 assert_eq!(window.key, Some(&key(1)));
985 assert!(window.preceding_saturated);
986 assert!(!window.following_saturated);
987 assert_eq!(
988 buffer.curr_window_values().cloned().collect_vec(),
989 vec!["hello", "session"]
990 );
991
992 buffer.append(key(8), "window"); let window = buffer.curr_window();
994 assert!(window.following_saturated);
995 assert_eq!(
996 buffer.curr_window_values().cloned().collect_vec(),
997 vec!["hello", "session"]
998 );
999
1000 buffer.append(key(15), "and");
1001 buffer.append(key(16), "world");
1002 assert_eq!(
1003 buffer.curr_window_values().cloned().collect_vec(),
1004 vec!["hello", "session"]
1005 );
1006
1007 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1008 assert!(removed_keys.is_empty());
1009 let window = buffer.curr_window();
1010 assert_eq!(window.key, Some(&key(3)));
1011 assert!(window.preceding_saturated);
1012 assert!(window.following_saturated);
1013 assert_eq!(
1014 buffer.curr_window_values().cloned().collect_vec(),
1015 vec!["hello", "session"]
1016 );
1017
1018 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1019 assert_eq!(removed_keys, vec![key(1), key(3)]);
1020 assert_eq!(buffer.smallest_key(), Some(&key(8)));
1021 let window = buffer.curr_window();
1022 assert_eq!(window.key, Some(&key(8)));
1023 assert!(window.preceding_saturated);
1024 assert!(window.following_saturated);
1025 assert_eq!(
1026 buffer.curr_window_values().cloned().collect_vec(),
1027 vec!["window"]
1028 );
1029
1030 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1031 assert_eq!(removed_keys, vec![key(8)]);
1032 assert_eq!(buffer.smallest_key(), Some(&key(15)));
1033 let window = buffer.curr_window();
1034 assert_eq!(window.key, Some(&key(15)));
1035 assert!(window.preceding_saturated);
1036 assert!(!window.following_saturated);
1037 assert_eq!(
1038 buffer.curr_window_values().cloned().collect_vec(),
1039 vec!["and", "world"]
1040 );
1041
1042 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1043 assert!(removed_keys.is_empty());
1044 assert_eq!(buffer.curr_key(), Some(&key(16)));
1045 assert_eq!(
1046 buffer.curr_window_values().cloned().collect_vec(),
1047 vec!["and", "world"]
1048 );
1049
1050 let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1051 assert_eq!(removed_keys, vec![key(15), key(16)]);
1052 assert!(buffer.curr_key().is_none());
1053 assert!(
1054 buffer
1055 .curr_window_values()
1056 .cloned()
1057 .collect_vec()
1058 .is_empty()
1059 );
1060 }
1061}