risingwave_expr_impl/window_function/
buffer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::ops::Range;
17
18use educe::Educe;
19use risingwave_common::array::Op;
20use risingwave_common::types::Sentinelled;
21use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
22use risingwave_expr::window_function::{
23    FrameExclusion, RangeFrameBounds, RowsFrameBounds, SessionFrameBounds, StateKey,
24};
25
26use super::range_utils::{range_diff, range_except};
27
28/// A common sliding window buffer.
29pub(super) struct WindowBuffer<W: WindowImpl> {
30    window_impl: W,
31    frame_exclusion: FrameExclusion,
32    buffer: VecDeque<Entry<W::Key, W::Value>>,
33    curr_idx: usize,
34    left_idx: usize,       // inclusive, note this can be > `curr_idx`
35    right_excl_idx: usize, // exclusive, note this can be <= `curr_idx` and even <= `left_idx`
36    curr_delta: Option<Vec<(Op, W::Value)>>,
37}
38
39/// A key-value pair in the buffer.
40struct Entry<K: Ord, V> {
41    key: K,
42    value: V,
43}
44
45/// Note: A window frame can be pure preceding, pure following, or acrossing the _current row_.
46pub(super) struct CurrWindow<'a, K> {
47    pub key: Option<&'a K>,
48
49    // XXX(rc): Maybe will be used in the future, let's keep it for now.
50    #[cfg_attr(not(test), expect(dead_code))]
51    /// The preceding half of the current window is saturated.
52    pub preceding_saturated: bool,
53    /// The following half of the current window is saturated.
54    pub following_saturated: bool,
55}
56
57impl<W: WindowImpl> WindowBuffer<W> {
58    pub fn new(window_impl: W, frame_exclusion: FrameExclusion, enable_delta: bool) -> Self {
59        if enable_delta {
60            // TODO(rc): currently only support `FrameExclusion::NoOthers` for delta
61            assert!(frame_exclusion.is_no_others());
62        }
63
64        Self {
65            window_impl,
66            frame_exclusion,
67            buffer: Default::default(),
68            curr_idx: 0,
69            left_idx: 0,
70            right_excl_idx: 0,
71            curr_delta: if enable_delta {
72                Some(Default::default())
73            } else {
74                None
75            },
76        }
77    }
78
79    /// Get the smallest key that is still kept in the buffer.
80    /// Returns `None` if there's nothing yet.
81    pub fn smallest_key(&self) -> Option<&W::Key> {
82        self.buffer.front().map(|Entry { key, .. }| key)
83    }
84
85    /// Get the key part of the current row.
86    pub fn curr_key(&self) -> Option<&W::Key> {
87        self.buffer.get(self.curr_idx).map(|Entry { key, .. }| key)
88    }
89
90    /// Get the current window info.
91    pub fn curr_window(&self) -> CurrWindow<'_, W::Key> {
92        let window = BufferRef {
93            buffer: &self.buffer,
94            curr_idx: self.curr_idx,
95            left_idx: self.left_idx,
96            right_excl_idx: self.right_excl_idx,
97        };
98        CurrWindow {
99            key: self.curr_key(),
100            preceding_saturated: self.window_impl.preceding_saturated(window),
101            following_saturated: self.window_impl.following_saturated(window),
102        }
103    }
104
105    fn curr_window_outer(&self) -> Range<usize> {
106        self.left_idx..std::cmp::max(self.right_excl_idx, self.left_idx)
107    }
108
109    fn curr_window_exclusion(&self) -> Range<usize> {
110        // TODO(rc): should intersect with `curr_window_outer` to be more accurate
111        match self.frame_exclusion {
112            FrameExclusion::CurrentRow => self.curr_idx..self.curr_idx + 1,
113            FrameExclusion::NoOthers => self.curr_idx..self.curr_idx,
114        }
115    }
116
117    fn curr_window_ranges(&self) -> (Range<usize>, Range<usize>) {
118        let selection = self.curr_window_outer();
119        let exclusion = self.curr_window_exclusion();
120        range_except(selection, exclusion)
121    }
122
123    /// Iterate over values in the current window.
124    pub fn curr_window_values(&self) -> impl DoubleEndedIterator<Item = &W::Value> {
125        assert!(self.left_idx <= self.buffer.len()); // `left_idx` can be the same as `buffer.len()` when the buffer is empty
126        assert!(self.right_excl_idx <= self.buffer.len());
127
128        let (left, right) = self.curr_window_ranges();
129        self.buffer
130            .range(left)
131            .chain(self.buffer.range(right))
132            .map(|Entry { value, .. }| value)
133    }
134
135    /// Consume the delta of values comparing the current window to the previous window.
136    /// The delta is not guaranteed to be sorted, especially when frame exclusion is not `NoOthers`.
137    pub fn consume_curr_window_values_delta(
138        &mut self,
139    ) -> impl Iterator<Item = (Op, W::Value)> + '_ {
140        self.curr_delta
141            .as_mut()
142            .expect("delta mode should be enabled")
143            .drain(..)
144    }
145
146    /// Append a key-value pair to the buffer.
147    pub fn append(&mut self, key: W::Key, value: W::Value) {
148        let old_outer = self.curr_window_outer();
149
150        self.buffer.push_back(Entry { key, value });
151        self.recalculate_left_right(RecalculateHint::Append);
152
153        if self.curr_delta.is_some() {
154            self.maintain_delta(old_outer, self.curr_window_outer());
155        }
156    }
157
158    /// Slide the current window forward.
159    /// Returns the keys that are removed from the buffer.
160    pub fn slide(&mut self) -> impl Iterator<Item = (W::Key, W::Value)> + '_ {
161        let old_outer = self.curr_window_outer();
162
163        self.curr_idx += 1;
164        self.recalculate_left_right(RecalculateHint::Slide);
165
166        if self.curr_delta.is_some() {
167            self.maintain_delta(old_outer, self.curr_window_outer());
168        }
169
170        let min_needed_idx = [self.left_idx, self.curr_idx, self.right_excl_idx]
171            .iter()
172            .min()
173            .copied()
174            .unwrap();
175        self.curr_idx -= min_needed_idx;
176        self.left_idx -= min_needed_idx;
177        self.right_excl_idx -= min_needed_idx;
178
179        self.window_impl.shift_indices(min_needed_idx);
180
181        self.buffer
182            .drain(0..min_needed_idx)
183            .map(|Entry { key, value }| (key, value))
184    }
185
186    fn maintain_delta(&mut self, old_outer: Range<usize>, new_outer: Range<usize>) {
187        debug_assert!(self.frame_exclusion.is_no_others());
188
189        let (outer_removed, outer_added) = range_diff(old_outer.clone(), new_outer.clone());
190        let delta = self.curr_delta.as_mut().unwrap();
191        for idx in outer_removed.iter().cloned().flatten() {
192            delta.push((Op::Delete, self.buffer[idx].value.clone()));
193        }
194        for idx in outer_added.iter().cloned().flatten() {
195            delta.push((Op::Insert, self.buffer[idx].value.clone()));
196        }
197    }
198
199    fn recalculate_left_right(&mut self, hint: RecalculateHint) {
200        let window = BufferRefMut {
201            buffer: &self.buffer,
202            curr_idx: &mut self.curr_idx,
203            left_idx: &mut self.left_idx,
204            right_excl_idx: &mut self.right_excl_idx,
205        };
206        self.window_impl.recalculate_left_right(window, hint);
207    }
208}
209
210/// Wraps a reference to the buffer and some indices, to be used by [`WindowImpl`]s.
211#[derive(Educe)]
212#[educe(Clone, Copy)]
213pub(super) struct BufferRef<'a, K: Ord, V: Clone> {
214    buffer: &'a VecDeque<Entry<K, V>>,
215    curr_idx: usize,
216    left_idx: usize,
217    right_excl_idx: usize,
218}
219
220/// Wraps a reference to the buffer and some mutable indices, to be used by [`WindowImpl`]s.
221pub(super) struct BufferRefMut<'a, K: Ord, V: Clone> {
222    buffer: &'a VecDeque<Entry<K, V>>,
223    curr_idx: &'a mut usize,
224    left_idx: &'a mut usize,
225    right_excl_idx: &'a mut usize,
226}
227
228#[derive(Clone, Copy, PartialEq, Eq)]
229pub(super) enum RecalculateHint {
230    Append,
231    Slide,
232}
233
234/// A trait for sliding window implementations. This trait is used by [`WindowBuffer`] to
235/// determine the status of current window and how to slide the window.
236pub(super) trait WindowImpl {
237    type Key: Ord;
238    type Value: Clone;
239
240    /// Whether the preceding half of the current window is saturated.
241    /// By "saturated" we mean that every row that is possible to be in the preceding half of the
242    /// current window is already in the buffer.
243    fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool;
244
245    /// Whether the following half of the current window is saturated.
246    fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool;
247
248    /// Recalculate the left and right indices of the current window, according to the latest
249    /// `curr_idx` and the hint.
250    fn recalculate_left_right(
251        &mut self,
252        window: BufferRefMut<'_, Self::Key, Self::Value>,
253        hint: RecalculateHint,
254    );
255
256    /// Shift the indices stored by the [`WindowImpl`] by `n` positions. This should be called
257    /// after evicting rows from the buffer.
258    fn shift_indices(&mut self, n: usize);
259}
260
261/// The sliding window implementation for `ROWS` frames.
262pub(super) struct RowsWindow<K: Ord, V: Clone> {
263    frame_bounds: RowsFrameBounds,
264    _phantom: std::marker::PhantomData<K>,
265    _phantom2: std::marker::PhantomData<V>,
266}
267
268impl<K: Ord, V: Clone> RowsWindow<K, V> {
269    pub fn new(frame_bounds: RowsFrameBounds) -> Self {
270        Self {
271            frame_bounds,
272            _phantom: std::marker::PhantomData,
273            _phantom2: std::marker::PhantomData,
274        }
275    }
276}
277
278impl<K: Ord, V: Clone> WindowImpl for RowsWindow<K, V> {
279    type Key = K;
280    type Value = V;
281
282    fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
283        window.curr_idx < window.buffer.len() && {
284            let start_off = self.frame_bounds.start.to_offset();
285            if let Some(start_off) = start_off {
286                if start_off >= 0 {
287                    true // pure following frame, always preceding-saturated
288                } else {
289                    // FIXME(rc): Clippy rule `clippy::nonminimal_bool` is misreporting that
290                    // the following can be simplified.
291                    #[allow(clippy::nonminimal_bool)]
292                    {
293                        assert!(window.curr_idx >= window.left_idx);
294                    }
295                    window.curr_idx - window.left_idx >= start_off.unsigned_abs()
296                }
297            } else {
298                false // unbounded frame start, never preceding-saturated
299            }
300        }
301    }
302
303    fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
304        window.curr_idx < window.buffer.len() && {
305            let end_off = self.frame_bounds.end.to_offset();
306            if let Some(end_off) = end_off {
307                if end_off <= 0 {
308                    true // pure preceding frame, always following-saturated
309                } else {
310                    // FIXME(rc): Ditto.
311                    #[allow(clippy::nonminimal_bool)]
312                    {
313                        assert!(window.right_excl_idx > 0);
314                        assert!(window.right_excl_idx > window.curr_idx);
315                        assert!(window.right_excl_idx <= window.buffer.len());
316                    }
317                    window.right_excl_idx - 1 - window.curr_idx >= end_off as usize
318                }
319            } else {
320                false // unbounded frame end, never following-saturated
321            }
322        }
323    }
324
325    fn recalculate_left_right(
326        &mut self,
327        window: BufferRefMut<'_, Self::Key, Self::Value>,
328        _hint: RecalculateHint,
329    ) {
330        if window.buffer.is_empty() {
331            *window.left_idx = 0;
332            *window.right_excl_idx = 0;
333        }
334
335        let start_off = self.frame_bounds.start.to_offset();
336        let end_off = self.frame_bounds.end.to_offset();
337        if let Some(start_off) = start_off {
338            let logical_left_idx = *window.curr_idx as isize + start_off;
339            if logical_left_idx >= 0 {
340                *window.left_idx = std::cmp::min(logical_left_idx as usize, window.buffer.len());
341            } else {
342                *window.left_idx = 0;
343            }
344        } else {
345            // unbounded start
346            *window.left_idx = 0;
347        }
348        if let Some(end_off) = end_off {
349            let logical_right_excl_idx = *window.curr_idx as isize + end_off + 1;
350            if logical_right_excl_idx >= 0 {
351                *window.right_excl_idx =
352                    std::cmp::min(logical_right_excl_idx as usize, window.buffer.len());
353            } else {
354                *window.right_excl_idx = 0;
355            }
356        } else {
357            // unbounded end
358            *window.right_excl_idx = window.buffer.len();
359        }
360    }
361
362    fn shift_indices(&mut self, _n: usize) {}
363}
364
365/// The sliding window implementation for `RANGE` frames.
366pub(super) struct RangeWindow<V: Clone> {
367    frame_bounds: RangeFrameBounds,
368    _phantom: std::marker::PhantomData<V>,
369}
370
371impl<V: Clone> RangeWindow<V> {
372    pub fn new(frame_bounds: RangeFrameBounds) -> Self {
373        Self {
374            frame_bounds,
375            _phantom: std::marker::PhantomData,
376        }
377    }
378}
379
380impl<V: Clone> WindowImpl for RangeWindow<V> {
381    type Key = StateKey;
382    type Value = V;
383
384    fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
385        window.curr_idx < window.buffer.len() && {
386            // XXX(rc): It seems that preceding saturation is not important, may remove later.
387            true
388        }
389    }
390
391    fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
392        window.curr_idx < window.buffer.len()
393            && {
394                // Left OK? (note that `left_idx` can be greater than `right_idx`)
395                // The following line checks whether the left value is the last one in the buffer.
396                // Here we adopt a conservative approach, which means we assume the next future value
397                // is likely to be the same as the last value in the current window, in which case
398                // we can't say the current window is saturated.
399                window.left_idx < window.buffer.len() /* non-zero */ - 1
400            }
401            && {
402                // Right OK? Ditto.
403                window.right_excl_idx < window.buffer.len()
404            }
405    }
406
407    fn recalculate_left_right(
408        &mut self,
409        window: BufferRefMut<'_, Self::Key, Self::Value>,
410        _hint: RecalculateHint,
411    ) {
412        if window.buffer.is_empty() {
413            *window.left_idx = 0;
414            *window.right_excl_idx = 0;
415        }
416
417        let Some(entry) = window.buffer.get(*window.curr_idx) else {
418            // If the current index has been moved to a future position, we can't touch anything
419            // because the next coming key may equal to the previous one which means the left and
420            // right indices will be the same.
421            return;
422        };
423        let curr_key = &entry.key;
424
425        let curr_order_value = memcmp_encoding::decode_value(
426            &self.frame_bounds.order_data_type,
427            &curr_key.order_key,
428            self.frame_bounds.order_type,
429        )
430        .expect("no reason to fail here because we just encoded it in memory");
431
432        match self.frame_bounds.frame_start_of(&curr_order_value) {
433            Sentinelled::Smallest => {
434                // unbounded frame start
435                assert_eq!(
436                    *window.left_idx, 0,
437                    "for unbounded start, left index should always be 0"
438                );
439            }
440            Sentinelled::Normal(value) => {
441                // bounded, find the start position
442                let value_enc = memcmp_encoding::encode_value(value, self.frame_bounds.order_type)
443                    .expect("no reason to fail here");
444                *window.left_idx = window
445                    .buffer
446                    .partition_point(|elem| elem.key.order_key < value_enc);
447            }
448            Sentinelled::Largest => unreachable!("frame start never be UNBOUNDED FOLLOWING"),
449        }
450
451        match self.frame_bounds.frame_end_of(curr_order_value) {
452            Sentinelled::Largest => {
453                // unbounded frame end
454                *window.right_excl_idx = window.buffer.len();
455            }
456            Sentinelled::Normal(value) => {
457                // bounded, find the end position
458                let value_enc = memcmp_encoding::encode_value(value, self.frame_bounds.order_type)
459                    .expect("no reason to fail here");
460                *window.right_excl_idx = window
461                    .buffer
462                    .partition_point(|elem| elem.key.order_key <= value_enc);
463            }
464            Sentinelled::Smallest => unreachable!("frame end never be UNBOUNDED PRECEDING"),
465        }
466    }
467
468    fn shift_indices(&mut self, _n: usize) {}
469}
470
471pub(super) struct SessionWindow<V: Clone> {
472    frame_bounds: SessionFrameBounds,
473    /// The latest session is the rightmost session in the buffer, which is updated during appending.
474    latest_session: Option<LatestSession>,
475    /// The sizes of recognized but not consumed sessions in the buffer. It's updated during appending.
476    /// The first element, if any, should be the size of the "current session window". When sliding,
477    /// the front should be popped.
478    recognized_session_sizes: VecDeque<usize>,
479    _phantom: std::marker::PhantomData<V>,
480}
481
482#[derive(Debug)]
483struct LatestSession {
484    /// The starting index of the latest session.
485    start_idx: usize,
486
487    /// Minimal next start means the minimal order value that can start a new session.
488    /// If a row has an order value less than this, it should be in the current session.
489    minimal_next_start: MemcmpEncoded,
490}
491
492impl<V: Clone> SessionWindow<V> {
493    pub fn new(frame_bounds: SessionFrameBounds) -> Self {
494        Self {
495            frame_bounds,
496            latest_session: None,
497            recognized_session_sizes: Default::default(),
498            _phantom: std::marker::PhantomData,
499        }
500    }
501}
502
503impl<V: Clone> WindowImpl for SessionWindow<V> {
504    type Key = StateKey;
505    type Value = V;
506
507    fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
508        window.curr_idx < window.buffer.len() && {
509            // XXX(rc): It seems that preceding saturation is not important, may remove later.
510            true
511        }
512    }
513
514    fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
515        window.curr_idx < window.buffer.len() && {
516            // For session window, `left_idx` is always smaller than `right_excl_idx`.
517            assert!(window.left_idx <= window.curr_idx);
518            assert!(window.curr_idx < window.right_excl_idx);
519
520            // The following expression checks whether the current window is the latest session.
521            // If it is, we don't think it's saturated because the next row may be still in the
522            // same session. Otherwise, we can safely say it's saturated.
523            self.latest_session
524                .as_ref()
525                .is_some_and(|LatestSession { start_idx, .. }| window.left_idx < *start_idx)
526        }
527    }
528
529    fn recalculate_left_right(
530        &mut self,
531        window: BufferRefMut<'_, Self::Key, Self::Value>,
532        hint: RecalculateHint,
533    ) {
534        // Terms:
535        // - Session: A continuous range of rows among any two of which the difference of order values
536        //   is less than the session gap. This is a concept on the whole stream. Sessions are recognized
537        //   during appending.
538        // - Current window: The range of rows that are represented by the indices in `window`. It is a
539        //   status of the `WindowBuffer`. If the current window happens to be the last session in the
540        //   buffer, it will be updated during appending. Otherwise it will only be updated during sliding.
541
542        match hint {
543            RecalculateHint::Append => {
544                assert!(!window.buffer.is_empty()); // because we just appended a row
545                let appended_idx = window.buffer.len() - 1;
546                let appended_key = &window.buffer[appended_idx].key;
547
548                let minimal_next_start_of_appended = self.frame_bounds.minimal_next_start_of(
549                    memcmp_encoding::decode_value(
550                        &self.frame_bounds.order_data_type,
551                        &appended_key.order_key,
552                        self.frame_bounds.order_type,
553                    )
554                    .expect("no reason to fail here because we just encoded it in memory"),
555                );
556                let minimal_next_start_enc_of_appended = memcmp_encoding::encode_value(
557                    minimal_next_start_of_appended,
558                    self.frame_bounds.order_type,
559                )
560                .expect("no reason to fail here");
561
562                if let Some(&mut LatestSession {
563                    ref start_idx,
564                    ref mut minimal_next_start,
565                }) = self.latest_session.as_mut()
566                {
567                    if &appended_key.order_key >= minimal_next_start {
568                        // the appended row starts a new session
569                        self.recognized_session_sizes
570                            .push_back(appended_idx - start_idx);
571                        self.latest_session = Some(LatestSession {
572                            start_idx: appended_idx,
573                            minimal_next_start: minimal_next_start_enc_of_appended,
574                        });
575                        // no need to update the current window because it's now corresponding
576                        // to some previous session
577                    } else {
578                        // the appended row belongs to the latest session
579                        *minimal_next_start = minimal_next_start_enc_of_appended;
580
581                        if *start_idx == *window.left_idx {
582                            // the current window is the latest session, we should extend it
583                            *window.right_excl_idx = appended_idx + 1;
584                        }
585                    }
586                } else {
587                    // no session yet, the current window should be empty
588                    let left_idx = *window.left_idx;
589                    let curr_idx = *window.curr_idx;
590                    let old_right_excl_idx = *window.right_excl_idx;
591                    assert_eq!(left_idx, curr_idx);
592                    assert_eq!(left_idx, old_right_excl_idx);
593                    assert_eq!(old_right_excl_idx, window.buffer.len() - 1);
594
595                    // now we put the first row into the current window
596                    *window.right_excl_idx = window.buffer.len();
597
598                    // and start to recognize the latest session
599                    self.latest_session = Some(LatestSession {
600                        start_idx: left_idx,
601                        minimal_next_start: minimal_next_start_enc_of_appended,
602                    });
603                }
604            }
605            RecalculateHint::Slide => {
606                let old_left_idx = *window.left_idx;
607                let new_curr_idx = *window.curr_idx;
608                let old_right_excl_idx = *window.right_excl_idx;
609
610                if new_curr_idx < old_right_excl_idx {
611                    // the current row is still in the current session window, no need to slide
612                } else {
613                    let old_session_size = self.recognized_session_sizes.pop_front();
614                    let next_session_size = self.recognized_session_sizes.front().copied();
615
616                    if let Some(old_session_size) = old_session_size {
617                        assert_eq!(old_session_size, old_right_excl_idx - old_left_idx);
618
619                        // slide the window to the next session
620                        if let Some(next_session_size) = next_session_size {
621                            // the next session is fully recognized, so we know the ending index
622                            *window.left_idx = old_right_excl_idx;
623                            *window.right_excl_idx = old_right_excl_idx + next_session_size;
624                        } else {
625                            // the next session is still in recognition, so we end the window at the end of buffer
626                            *window.left_idx = old_right_excl_idx;
627                            *window.right_excl_idx = window.buffer.len();
628                        }
629                    } else {
630                        // no recognized session yet, meaning the current window is the last session in the buffer
631                        assert_eq!(old_right_excl_idx, window.buffer.len());
632                        *window.left_idx = old_right_excl_idx;
633                        *window.right_excl_idx = old_right_excl_idx;
634                        self.latest_session = None;
635                    }
636                }
637            }
638        }
639    }
640
641    fn shift_indices(&mut self, n: usize) {
642        if let Some(LatestSession { start_idx, .. }) = self.latest_session.as_mut() {
643            *start_idx -= n;
644        }
645    }
646}
647
648#[cfg(test)]
649mod tests {
650    use itertools::Itertools;
651    use risingwave_common::row::OwnedRow;
652    use risingwave_common::types::{DataType, ScalarImpl};
653    use risingwave_common::util::sort_util::OrderType;
654    use risingwave_expr::window_function::FrameBound::{
655        CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding,
656    };
657    use risingwave_expr::window_function::SessionFrameGap;
658
659    use super::*;
660
661    #[test]
662    fn test_rows_frame_unbounded_preceding_to_current_row() {
663        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
664            RowsWindow::new(RowsFrameBounds {
665                start: UnboundedPreceding,
666                end: CurrentRow,
667            }),
668            FrameExclusion::NoOthers,
669            true,
670        );
671
672        let window = buffer.curr_window();
673        assert!(window.key.is_none());
674        assert!(!window.preceding_saturated);
675        assert!(!window.following_saturated);
676        buffer.append(1, "hello");
677        let window = buffer.curr_window();
678        assert_eq!(window.key, Some(&1));
679        assert!(!window.preceding_saturated); // unbounded preceding is never saturated
680        assert!(window.following_saturated);
681        buffer.append(2, "world");
682        let window = buffer.curr_window();
683        assert_eq!(window.key, Some(&1));
684        assert!(!window.preceding_saturated);
685        assert!(window.following_saturated);
686        assert_eq!(
687            buffer.curr_window_values().cloned().collect_vec(),
688            vec!["hello"]
689        );
690        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
691        assert!(removed_keys.is_empty()); // unbouded preceding, nothing can ever be removed
692        let window = buffer.curr_window();
693        assert_eq!(window.key, Some(&2));
694        assert!(!window.preceding_saturated);
695        assert!(window.following_saturated);
696        assert_eq!(buffer.smallest_key(), Some(&1));
697    }
698
699    #[test]
700    fn test_rows_frame_preceding_to_current_row() {
701        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
702            RowsWindow::new(RowsFrameBounds {
703                start: Preceding(1),
704                end: CurrentRow,
705            }),
706            FrameExclusion::NoOthers,
707            true,
708        );
709
710        let window = buffer.curr_window();
711        assert!(window.key.is_none());
712        assert!(!window.preceding_saturated);
713        assert!(!window.following_saturated);
714        buffer.append(1, "hello");
715        let window = buffer.curr_window();
716        assert_eq!(window.key, Some(&1));
717        assert!(!window.preceding_saturated);
718        assert!(window.following_saturated);
719        assert_eq!(
720            buffer.curr_window_values().cloned().collect_vec(),
721            vec!["hello"]
722        );
723        buffer.append(2, "world");
724        let window = buffer.curr_window();
725        assert_eq!(window.key, Some(&1));
726        assert!(!window.preceding_saturated);
727        assert!(window.following_saturated);
728        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
729        assert!(removed_keys.is_empty());
730        let window = buffer.curr_window();
731        assert_eq!(window.key, Some(&2));
732        assert!(window.preceding_saturated);
733        assert!(window.following_saturated);
734        assert_eq!(buffer.smallest_key(), Some(&1));
735        buffer.append(3, "!");
736        let window = buffer.curr_window();
737        assert_eq!(window.key, Some(&2));
738        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
739        assert_eq!(removed_keys, vec![1]);
740    }
741
742    #[test]
743    fn test_rows_frame_preceding_to_preceding() {
744        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
745            RowsWindow::new(RowsFrameBounds {
746                start: Preceding(2),
747                end: Preceding(1),
748            }),
749            FrameExclusion::NoOthers,
750            true,
751        );
752
753        buffer.append(1, "RisingWave");
754        let window = buffer.curr_window();
755        assert_eq!(window.key, Some(&1));
756        assert!(!window.preceding_saturated);
757        assert!(window.following_saturated);
758        assert!(buffer.curr_window_values().collect_vec().is_empty());
759        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
760        assert!(removed_keys.is_empty());
761        assert_eq!(buffer.smallest_key(), Some(&1));
762        buffer.append(2, "is the best");
763        let window = buffer.curr_window();
764        assert_eq!(window.key, Some(&2));
765        assert!(!window.preceding_saturated);
766        assert!(window.following_saturated);
767        assert_eq!(
768            buffer.curr_window_values().cloned().collect_vec(),
769            vec!["RisingWave"]
770        );
771        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
772        assert!(removed_keys.is_empty());
773        assert_eq!(buffer.smallest_key(), Some(&1));
774        buffer.append(3, "streaming platform");
775        let window = buffer.curr_window();
776        assert_eq!(window.key, Some(&3));
777        assert!(window.preceding_saturated);
778        assert!(window.following_saturated);
779        assert_eq!(
780            buffer.curr_window_values().cloned().collect_vec(),
781            vec!["RisingWave", "is the best"]
782        );
783        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
784        assert_eq!(removed_keys, vec![1]);
785        assert_eq!(buffer.smallest_key(), Some(&2));
786    }
787
788    #[test]
789    fn test_rows_frame_current_row_to_unbounded_following() {
790        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
791            RowsWindow::new(RowsFrameBounds {
792                start: CurrentRow,
793                end: UnboundedFollowing,
794            }),
795            FrameExclusion::NoOthers,
796            true,
797        );
798
799        buffer.append(1, "RisingWave");
800        let window = buffer.curr_window();
801        assert_eq!(window.key, Some(&1));
802        assert!(window.preceding_saturated);
803        assert!(!window.following_saturated);
804        assert_eq!(
805            buffer.curr_window_values().cloned().collect_vec(),
806            vec!["RisingWave"]
807        );
808        buffer.append(2, "is the best");
809        let window = buffer.curr_window();
810        assert_eq!(window.key, Some(&1));
811        assert!(window.preceding_saturated);
812        assert!(!window.following_saturated);
813        assert_eq!(
814            buffer.curr_window_values().cloned().collect_vec(),
815            vec!["RisingWave", "is the best"]
816        );
817        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
818        assert_eq!(removed_keys, vec![1]);
819        assert_eq!(buffer.smallest_key(), Some(&2));
820        let window = buffer.curr_window();
821        assert_eq!(window.key, Some(&2));
822        assert!(window.preceding_saturated);
823        assert!(!window.following_saturated);
824        assert_eq!(
825            buffer.curr_window_values().cloned().collect_vec(),
826            vec!["is the best"]
827        );
828    }
829
830    #[test]
831    fn test_rows_frame_current_row_to_following() {
832        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
833            RowsWindow::new(RowsFrameBounds {
834                start: CurrentRow,
835                end: Following(1),
836            }),
837            FrameExclusion::NoOthers,
838            true,
839        );
840
841        buffer.append(1, "RisingWave");
842        let window = buffer.curr_window();
843        assert_eq!(window.key, Some(&1));
844        assert!(window.preceding_saturated);
845        assert!(!window.following_saturated);
846        assert_eq!(
847            buffer.curr_window_values().cloned().collect_vec(),
848            vec!["RisingWave"]
849        );
850        buffer.append(2, "is the best");
851        let window = buffer.curr_window();
852        assert_eq!(window.key, Some(&1));
853        assert!(window.preceding_saturated);
854        assert!(window.following_saturated);
855        assert_eq!(
856            buffer.curr_window_values().cloned().collect_vec(),
857            vec!["RisingWave", "is the best"]
858        );
859        buffer.append(3, "streaming platform");
860        let window = buffer.curr_window();
861        assert_eq!(window.key, Some(&1));
862        assert!(window.preceding_saturated);
863        assert!(window.following_saturated);
864        assert_eq!(
865            buffer.curr_window_values().cloned().collect_vec(),
866            vec!["RisingWave", "is the best"]
867        );
868        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
869        assert_eq!(removed_keys, vec![1]);
870        let window = buffer.curr_window();
871        assert_eq!(window.key, Some(&2));
872        assert!(window.preceding_saturated);
873        assert!(window.following_saturated);
874        assert_eq!(
875            buffer.curr_window_values().cloned().collect_vec(),
876            vec!["is the best", "streaming platform"]
877        );
878    }
879
880    #[test]
881    fn test_rows_frame_following_to_following() {
882        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
883            RowsWindow::new(RowsFrameBounds {
884                start: Following(1),
885                end: Following(2),
886            }),
887            FrameExclusion::NoOthers,
888            true,
889        );
890
891        buffer.append(1, "RisingWave");
892        let window = buffer.curr_window();
893        assert_eq!(window.key, Some(&1));
894        assert!(window.preceding_saturated);
895        assert!(!window.following_saturated);
896        assert!(buffer.curr_window_values().collect_vec().is_empty());
897        buffer.append(2, "is the best");
898        let window = buffer.curr_window();
899        assert_eq!(window.key, Some(&1));
900        assert!(window.preceding_saturated);
901        assert!(!window.following_saturated);
902        assert_eq!(
903            buffer.curr_window_values().cloned().collect_vec(),
904            vec!["is the best"]
905        );
906        buffer.append(3, "streaming platform");
907        let window = buffer.curr_window();
908        assert_eq!(window.key, Some(&1));
909        assert!(window.preceding_saturated);
910        assert!(window.following_saturated);
911        assert_eq!(
912            buffer.curr_window_values().cloned().collect_vec(),
913            vec!["is the best", "streaming platform"]
914        );
915        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
916        assert_eq!(removed_keys, vec![1]);
917        let window = buffer.curr_window();
918        assert_eq!(window.key, Some(&2));
919        assert!(window.preceding_saturated);
920        assert!(!window.following_saturated);
921        assert_eq!(
922            buffer.curr_window_values().cloned().collect_vec(),
923            vec!["streaming platform"]
924        );
925    }
926
927    #[test]
928    fn test_rows_frame_exclude_current_row() {
929        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
930            RowsWindow::new(RowsFrameBounds {
931                start: UnboundedPreceding,
932                end: CurrentRow,
933            }),
934            FrameExclusion::CurrentRow,
935            false,
936        );
937
938        buffer.append(1, "hello");
939        assert!(
940            buffer
941                .curr_window_values()
942                .cloned()
943                .collect_vec()
944                .is_empty()
945        );
946        buffer.append(2, "world");
947        let _ = buffer.slide();
948        assert_eq!(
949            buffer.curr_window_values().cloned().collect_vec(),
950            vec!["hello"]
951        );
952    }
953
954    #[test]
955    fn test_session_frame() {
956        let order_data_type = DataType::Int64;
957        let order_type = OrderType::ascending();
958        let gap_data_type = DataType::Int64;
959
960        let mut buffer = WindowBuffer::<SessionWindow<_>>::new(
961            SessionWindow::new(SessionFrameBounds {
962                order_data_type: order_data_type.clone(),
963                order_type,
964                gap_data_type: gap_data_type.clone(),
965                gap: SessionFrameGap::new_for_test(
966                    ScalarImpl::Int64(5),
967                    &order_data_type,
968                    &gap_data_type,
969                ),
970            }),
971            FrameExclusion::NoOthers,
972            true,
973        );
974
975        let key = |key: i64| -> StateKey {
976            StateKey {
977                order_key: memcmp_encoding::encode_value(Some(ScalarImpl::from(key)), order_type)
978                    .unwrap(),
979                pk: OwnedRow::empty().into(),
980            }
981        };
982
983        assert!(buffer.curr_key().is_none());
984
985        buffer.append(key(1), "hello");
986        buffer.append(key(3), "session");
987        let window = buffer.curr_window();
988        assert_eq!(window.key, Some(&key(1)));
989        assert!(window.preceding_saturated);
990        assert!(!window.following_saturated);
991        assert_eq!(
992            buffer.curr_window_values().cloned().collect_vec(),
993            vec!["hello", "session"]
994        );
995
996        buffer.append(key(8), "window"); // start a new session
997        let window = buffer.curr_window();
998        assert!(window.following_saturated);
999        assert_eq!(
1000            buffer.curr_window_values().cloned().collect_vec(),
1001            vec!["hello", "session"]
1002        );
1003
1004        buffer.append(key(15), "and");
1005        buffer.append(key(16), "world");
1006        assert_eq!(
1007            buffer.curr_window_values().cloned().collect_vec(),
1008            vec!["hello", "session"]
1009        );
1010
1011        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1012        assert!(removed_keys.is_empty());
1013        let window = buffer.curr_window();
1014        assert_eq!(window.key, Some(&key(3)));
1015        assert!(window.preceding_saturated);
1016        assert!(window.following_saturated);
1017        assert_eq!(
1018            buffer.curr_window_values().cloned().collect_vec(),
1019            vec!["hello", "session"]
1020        );
1021
1022        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1023        assert_eq!(removed_keys, vec![key(1), key(3)]);
1024        assert_eq!(buffer.smallest_key(), Some(&key(8)));
1025        let window = buffer.curr_window();
1026        assert_eq!(window.key, Some(&key(8)));
1027        assert!(window.preceding_saturated);
1028        assert!(window.following_saturated);
1029        assert_eq!(
1030            buffer.curr_window_values().cloned().collect_vec(),
1031            vec!["window"]
1032        );
1033
1034        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1035        assert_eq!(removed_keys, vec![key(8)]);
1036        assert_eq!(buffer.smallest_key(), Some(&key(15)));
1037        let window = buffer.curr_window();
1038        assert_eq!(window.key, Some(&key(15)));
1039        assert!(window.preceding_saturated);
1040        assert!(!window.following_saturated);
1041        assert_eq!(
1042            buffer.curr_window_values().cloned().collect_vec(),
1043            vec!["and", "world"]
1044        );
1045
1046        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1047        assert!(removed_keys.is_empty());
1048        assert_eq!(buffer.curr_key(), Some(&key(16)));
1049        assert_eq!(
1050            buffer.curr_window_values().cloned().collect_vec(),
1051            vec!["and", "world"]
1052        );
1053
1054        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1055        assert_eq!(removed_keys, vec![key(15), key(16)]);
1056        assert!(buffer.curr_key().is_none());
1057        assert!(
1058            buffer
1059                .curr_window_values()
1060                .cloned()
1061                .collect_vec()
1062                .is_empty()
1063        );
1064    }
1065}