risingwave_expr_impl/window_function/
buffer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::ops::Range;
17
18use educe::Educe;
19use risingwave_common::array::Op;
20use risingwave_common::types::Sentinelled;
21use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
22use risingwave_expr::window_function::{
23    FrameExclusion, RangeFrameBounds, RowsFrameBounds, SessionFrameBounds, StateKey,
24};
25
26use super::range_utils::{range_diff, range_except};
27
28/// A common sliding window buffer.
29pub(super) struct WindowBuffer<W: WindowImpl> {
30    window_impl: W,
31    frame_exclusion: FrameExclusion,
32    buffer: VecDeque<Entry<W::Key, W::Value>>,
33    curr_idx: usize,
34    left_idx: usize,       // inclusive, note this can be > `curr_idx`
35    right_excl_idx: usize, // exclusive, note this can be <= `curr_idx`
36    curr_delta: Option<Vec<(Op, W::Value)>>,
37}
38
39/// A key-value pair in the buffer.
40struct Entry<K: Ord, V> {
41    key: K,
42    value: V,
43}
44
45/// Note: A window frame can be pure preceding, pure following, or acrossing the _current row_.
46pub(super) struct CurrWindow<'a, K> {
47    pub key: Option<&'a K>,
48
49    // XXX(rc): Maybe will be used in the future, let's keep it for now.
50    #[cfg_attr(not(test), expect(dead_code))]
51    /// The preceding half of the current window is saturated.
52    pub preceding_saturated: bool,
53    /// The following half of the current window is saturated.
54    pub following_saturated: bool,
55}
56
57impl<W: WindowImpl> WindowBuffer<W> {
58    pub fn new(window_impl: W, frame_exclusion: FrameExclusion, enable_delta: bool) -> Self {
59        if enable_delta {
60            // TODO(rc): currently only support `FrameExclusion::NoOthers` for delta
61            assert!(frame_exclusion.is_no_others());
62        }
63
64        Self {
65            window_impl,
66            frame_exclusion,
67            buffer: Default::default(),
68            curr_idx: 0,
69            left_idx: 0,
70            right_excl_idx: 0,
71            curr_delta: if enable_delta {
72                Some(Default::default())
73            } else {
74                None
75            },
76        }
77    }
78
79    /// Get the smallest key that is still kept in the buffer.
80    /// Returns `None` if there's nothing yet.
81    pub fn smallest_key(&self) -> Option<&W::Key> {
82        self.buffer.front().map(|Entry { key, .. }| key)
83    }
84
85    /// Get the key part of the current row.
86    pub fn curr_key(&self) -> Option<&W::Key> {
87        self.buffer.get(self.curr_idx).map(|Entry { key, .. }| key)
88    }
89
90    /// Get the current window info.
91    pub fn curr_window(&self) -> CurrWindow<'_, W::Key> {
92        let window = BufferRef {
93            buffer: &self.buffer,
94            curr_idx: self.curr_idx,
95            left_idx: self.left_idx,
96            right_excl_idx: self.right_excl_idx,
97        };
98        CurrWindow {
99            key: self.curr_key(),
100            preceding_saturated: self.window_impl.preceding_saturated(window),
101            following_saturated: self.window_impl.following_saturated(window),
102        }
103    }
104
105    fn curr_window_outer(&self) -> Range<usize> {
106        self.left_idx..self.right_excl_idx
107    }
108
109    fn curr_window_exclusion(&self) -> Range<usize> {
110        // TODO(rc): should intersect with `curr_window_outer` to be more accurate
111        match self.frame_exclusion {
112            FrameExclusion::CurrentRow => self.curr_idx..self.curr_idx + 1,
113            FrameExclusion::NoOthers => self.curr_idx..self.curr_idx,
114        }
115    }
116
117    fn curr_window_ranges(&self) -> (Range<usize>, Range<usize>) {
118        let selection = self.curr_window_outer();
119        let exclusion = self.curr_window_exclusion();
120        range_except(selection, exclusion)
121    }
122
123    /// Iterate over values in the current window.
124    pub fn curr_window_values(&self) -> impl DoubleEndedIterator<Item = &W::Value> {
125        assert!(self.left_idx <= self.right_excl_idx);
126        assert!(self.right_excl_idx <= self.buffer.len());
127
128        let (left, right) = self.curr_window_ranges();
129        self.buffer
130            .range(left)
131            .chain(self.buffer.range(right))
132            .map(|Entry { value, .. }| value)
133    }
134
135    /// Consume the delta of values comparing the current window to the previous window.
136    /// The delta is not guaranteed to be sorted, especially when frame exclusion is not `NoOthers`.
137    pub fn consume_curr_window_values_delta(
138        &mut self,
139    ) -> impl Iterator<Item = (Op, W::Value)> + '_ {
140        self.curr_delta
141            .as_mut()
142            .expect("delta mode should be enabled")
143            .drain(..)
144    }
145
146    /// Append a key-value pair to the buffer.
147    pub fn append(&mut self, key: W::Key, value: W::Value) {
148        let old_outer = self.curr_window_outer();
149
150        self.buffer.push_back(Entry { key, value });
151        self.recalculate_left_right(RecalculateHint::Append);
152
153        if self.curr_delta.is_some() {
154            self.maintain_delta(old_outer, self.curr_window_outer());
155        }
156    }
157
158    /// Slide the current window forward.
159    /// Returns the keys that are removed from the buffer.
160    pub fn slide(&mut self) -> impl Iterator<Item = (W::Key, W::Value)> + '_ {
161        let old_outer = self.curr_window_outer();
162
163        self.curr_idx += 1;
164        self.recalculate_left_right(RecalculateHint::Slide);
165
166        if self.curr_delta.is_some() {
167            self.maintain_delta(old_outer, self.curr_window_outer());
168        }
169
170        let min_needed_idx = std::cmp::min(self.left_idx, self.curr_idx);
171        self.curr_idx -= min_needed_idx;
172        self.left_idx -= min_needed_idx;
173        self.right_excl_idx -= min_needed_idx;
174
175        self.window_impl.shift_indices(min_needed_idx);
176
177        self.buffer
178            .drain(0..min_needed_idx)
179            .map(|Entry { key, value }| (key, value))
180    }
181
182    fn maintain_delta(&mut self, old_outer: Range<usize>, new_outer: Range<usize>) {
183        debug_assert!(self.frame_exclusion.is_no_others());
184
185        let (outer_removed, outer_added) = range_diff(old_outer.clone(), new_outer.clone());
186        let delta = self.curr_delta.as_mut().unwrap();
187        for idx in outer_removed.iter().cloned().flatten() {
188            delta.push((Op::Delete, self.buffer[idx].value.clone()));
189        }
190        for idx in outer_added.iter().cloned().flatten() {
191            delta.push((Op::Insert, self.buffer[idx].value.clone()));
192        }
193    }
194
195    fn recalculate_left_right(&mut self, hint: RecalculateHint) {
196        let window = BufferRefMut {
197            buffer: &self.buffer,
198            curr_idx: &mut self.curr_idx,
199            left_idx: &mut self.left_idx,
200            right_excl_idx: &mut self.right_excl_idx,
201        };
202        self.window_impl.recalculate_left_right(window, hint);
203    }
204}
205
206/// Wraps a reference to the buffer and some indices, to be used by [`WindowImpl`]s.
207#[derive(Educe)]
208#[educe(Clone, Copy)]
209pub(super) struct BufferRef<'a, K: Ord, V: Clone> {
210    buffer: &'a VecDeque<Entry<K, V>>,
211    curr_idx: usize,
212    left_idx: usize,
213    right_excl_idx: usize,
214}
215
216/// Wraps a reference to the buffer and some mutable indices, to be used by [`WindowImpl`]s.
217pub(super) struct BufferRefMut<'a, K: Ord, V: Clone> {
218    buffer: &'a VecDeque<Entry<K, V>>,
219    curr_idx: &'a mut usize,
220    left_idx: &'a mut usize,
221    right_excl_idx: &'a mut usize,
222}
223
224#[derive(Clone, Copy, PartialEq, Eq)]
225pub(super) enum RecalculateHint {
226    Append,
227    Slide,
228}
229
230/// A trait for sliding window implementations. This trait is used by [`WindowBuffer`] to
231/// determine the status of current window and how to slide the window.
232pub(super) trait WindowImpl {
233    type Key: Ord;
234    type Value: Clone;
235
236    /// Whether the preceding half of the current window is saturated.
237    /// By "saturated" we mean that every row that is possible to be in the preceding half of the
238    /// current window is already in the buffer.
239    fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool;
240
241    /// Whether the following half of the current window is saturated.
242    fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool;
243
244    /// Recalculate the left and right indices of the current window, according to the latest
245    /// `curr_idx` and the hint.
246    fn recalculate_left_right(
247        &mut self,
248        window: BufferRefMut<'_, Self::Key, Self::Value>,
249        hint: RecalculateHint,
250    );
251
252    /// Shift the indices stored by the [`WindowImpl`] by `n` positions. This should be called
253    /// after evicting rows from the buffer.
254    fn shift_indices(&mut self, n: usize);
255}
256
257/// The sliding window implementation for `ROWS` frames.
258pub(super) struct RowsWindow<K: Ord, V: Clone> {
259    frame_bounds: RowsFrameBounds,
260    _phantom: std::marker::PhantomData<K>,
261    _phantom2: std::marker::PhantomData<V>,
262}
263
264impl<K: Ord, V: Clone> RowsWindow<K, V> {
265    pub fn new(frame_bounds: RowsFrameBounds) -> Self {
266        Self {
267            frame_bounds,
268            _phantom: std::marker::PhantomData,
269            _phantom2: std::marker::PhantomData,
270        }
271    }
272}
273
274impl<K: Ord, V: Clone> WindowImpl for RowsWindow<K, V> {
275    type Key = K;
276    type Value = V;
277
278    fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
279        window.curr_idx < window.buffer.len() && {
280            let start_off = self.frame_bounds.start.to_offset();
281            if let Some(start_off) = start_off {
282                if start_off >= 0 {
283                    true // pure following frame, always preceding-saturated
284                } else {
285                    // FIXME(rc): Clippy rule `clippy::nonminimal_bool` is misreporting that
286                    // the following can be simplified.
287                    #[allow(clippy::nonminimal_bool)]
288                    {
289                        assert!(window.curr_idx >= window.left_idx);
290                    }
291                    window.curr_idx - window.left_idx >= start_off.unsigned_abs()
292                }
293            } else {
294                false // unbounded frame start, never preceding-saturated
295            }
296        }
297    }
298
299    fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
300        window.curr_idx < window.buffer.len() && {
301            let end_off = self.frame_bounds.end.to_offset();
302            if let Some(end_off) = end_off {
303                if end_off <= 0 {
304                    true // pure preceding frame, always following-saturated
305                } else {
306                    // FIXME(rc): Ditto.
307                    #[allow(clippy::nonminimal_bool)]
308                    {
309                        assert!(window.right_excl_idx > 0);
310                        assert!(window.right_excl_idx > window.curr_idx);
311                        assert!(window.right_excl_idx <= window.buffer.len());
312                    }
313                    window.right_excl_idx - 1 - window.curr_idx >= end_off as usize
314                }
315            } else {
316                false // unbounded frame end, never following-saturated
317            }
318        }
319    }
320
321    fn recalculate_left_right(
322        &mut self,
323        window: BufferRefMut<'_, Self::Key, Self::Value>,
324        _hint: RecalculateHint,
325    ) {
326        if window.buffer.is_empty() {
327            *window.left_idx = 0;
328            *window.right_excl_idx = 0;
329        }
330
331        let start_off = self.frame_bounds.start.to_offset();
332        let end_off = self.frame_bounds.end.to_offset();
333        if let Some(start_off) = start_off {
334            let logical_left_idx = *window.curr_idx as isize + start_off;
335            if logical_left_idx >= 0 {
336                *window.left_idx = std::cmp::min(logical_left_idx as usize, window.buffer.len());
337            } else {
338                *window.left_idx = 0;
339            }
340        } else {
341            // unbounded start
342            *window.left_idx = 0;
343        }
344        if let Some(end_off) = end_off {
345            let logical_right_excl_idx = *window.curr_idx as isize + end_off + 1;
346            if logical_right_excl_idx >= 0 {
347                *window.right_excl_idx =
348                    std::cmp::min(logical_right_excl_idx as usize, window.buffer.len());
349            } else {
350                *window.right_excl_idx = 0;
351            }
352        } else {
353            // unbounded end
354            *window.right_excl_idx = window.buffer.len();
355        }
356    }
357
358    fn shift_indices(&mut self, _n: usize) {}
359}
360
361/// The sliding window implementation for `RANGE` frames.
362pub(super) struct RangeWindow<V: Clone> {
363    frame_bounds: RangeFrameBounds,
364    _phantom: std::marker::PhantomData<V>,
365}
366
367impl<V: Clone> RangeWindow<V> {
368    pub fn new(frame_bounds: RangeFrameBounds) -> Self {
369        Self {
370            frame_bounds,
371            _phantom: std::marker::PhantomData,
372        }
373    }
374}
375
376impl<V: Clone> WindowImpl for RangeWindow<V> {
377    type Key = StateKey;
378    type Value = V;
379
380    fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
381        window.curr_idx < window.buffer.len() && {
382            // XXX(rc): It seems that preceding saturation is not important, may remove later.
383            true
384        }
385    }
386
387    fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
388        window.curr_idx < window.buffer.len()
389            && {
390                // Left OK? (note that `left_idx` can be greater than `right_idx`)
391                // The following line checks whether the left value is the last one in the buffer.
392                // Here we adopt a conservative approach, which means we assume the next future value
393                // is likely to be the same as the last value in the current window, in which case
394                // we can't say the current window is saturated.
395                window.left_idx < window.buffer.len() /* non-zero */ - 1
396            }
397            && {
398                // Right OK? Ditto.
399                window.right_excl_idx < window.buffer.len()
400            }
401    }
402
403    fn recalculate_left_right(
404        &mut self,
405        window: BufferRefMut<'_, Self::Key, Self::Value>,
406        _hint: RecalculateHint,
407    ) {
408        if window.buffer.is_empty() {
409            *window.left_idx = 0;
410            *window.right_excl_idx = 0;
411        }
412
413        let Some(entry) = window.buffer.get(*window.curr_idx) else {
414            // If the current index has been moved to a future position, we can't touch anything
415            // because the next coming key may equal to the previous one which means the left and
416            // right indices will be the same.
417            return;
418        };
419        let curr_key = &entry.key;
420
421        let curr_order_value = memcmp_encoding::decode_value(
422            &self.frame_bounds.order_data_type,
423            &curr_key.order_key,
424            self.frame_bounds.order_type,
425        )
426        .expect("no reason to fail here because we just encoded it in memory");
427
428        match self.frame_bounds.frame_start_of(&curr_order_value) {
429            Sentinelled::Smallest => {
430                // unbounded frame start
431                assert_eq!(
432                    *window.left_idx, 0,
433                    "for unbounded start, left index should always be 0"
434                );
435            }
436            Sentinelled::Normal(value) => {
437                // bounded, find the start position
438                let value_enc = memcmp_encoding::encode_value(value, self.frame_bounds.order_type)
439                    .expect("no reason to fail here");
440                *window.left_idx = window
441                    .buffer
442                    .partition_point(|elem| elem.key.order_key < value_enc);
443            }
444            Sentinelled::Largest => unreachable!("frame start never be UNBOUNDED FOLLOWING"),
445        }
446
447        match self.frame_bounds.frame_end_of(curr_order_value) {
448            Sentinelled::Largest => {
449                // unbounded frame end
450                *window.right_excl_idx = window.buffer.len();
451            }
452            Sentinelled::Normal(value) => {
453                // bounded, find the end position
454                let value_enc = memcmp_encoding::encode_value(value, self.frame_bounds.order_type)
455                    .expect("no reason to fail here");
456                *window.right_excl_idx = window
457                    .buffer
458                    .partition_point(|elem| elem.key.order_key <= value_enc);
459            }
460            Sentinelled::Smallest => unreachable!("frame end never be UNBOUNDED PRECEDING"),
461        }
462    }
463
464    fn shift_indices(&mut self, _n: usize) {}
465}
466
467pub(super) struct SessionWindow<V: Clone> {
468    frame_bounds: SessionFrameBounds,
469    /// The latest session is the rightmost session in the buffer, which is updated during appending.
470    latest_session: Option<LatestSession>,
471    /// The sizes of recognized but not consumed sessions in the buffer. It's updated during appending.
472    /// The first element, if any, should be the size of the "current session window". When sliding,
473    /// the front should be popped.
474    recognized_session_sizes: VecDeque<usize>,
475    _phantom: std::marker::PhantomData<V>,
476}
477
478#[derive(Debug)]
479struct LatestSession {
480    /// The starting index of the latest session.
481    start_idx: usize,
482
483    /// Minimal next start means the minimal order value that can start a new session.
484    /// If a row has an order value less than this, it should be in the current session.
485    minimal_next_start: MemcmpEncoded,
486}
487
488impl<V: Clone> SessionWindow<V> {
489    pub fn new(frame_bounds: SessionFrameBounds) -> Self {
490        Self {
491            frame_bounds,
492            latest_session: None,
493            recognized_session_sizes: Default::default(),
494            _phantom: std::marker::PhantomData,
495        }
496    }
497}
498
499impl<V: Clone> WindowImpl for SessionWindow<V> {
500    type Key = StateKey;
501    type Value = V;
502
503    fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
504        window.curr_idx < window.buffer.len() && {
505            // XXX(rc): It seems that preceding saturation is not important, may remove later.
506            true
507        }
508    }
509
510    fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool {
511        window.curr_idx < window.buffer.len() && {
512            // For session window, `left_idx` is always smaller than `right_excl_idx`.
513            assert!(window.left_idx <= window.curr_idx);
514            assert!(window.curr_idx < window.right_excl_idx);
515
516            // The following expression checks whether the current window is the latest session.
517            // If it is, we don't think it's saturated because the next row may be still in the
518            // same session. Otherwise, we can safely say it's saturated.
519            self.latest_session
520                .as_ref()
521                .is_some_and(|LatestSession { start_idx, .. }| window.left_idx < *start_idx)
522        }
523    }
524
525    fn recalculate_left_right(
526        &mut self,
527        window: BufferRefMut<'_, Self::Key, Self::Value>,
528        hint: RecalculateHint,
529    ) {
530        // Terms:
531        // - Session: A continuous range of rows among any two of which the difference of order values
532        //   is less than the session gap. This is a concept on the whole stream. Sessions are recognized
533        //   during appending.
534        // - Current window: The range of rows that are represented by the indices in `window`. It is a
535        //   status of the `WindowBuffer`. If the current window happens to be the last session in the
536        //   buffer, it will be updated during appending. Otherwise it will only be updated during sliding.
537
538        match hint {
539            RecalculateHint::Append => {
540                assert!(!window.buffer.is_empty()); // because we just appended a row
541                let appended_idx = window.buffer.len() - 1;
542                let appended_key = &window.buffer[appended_idx].key;
543
544                let minimal_next_start_of_appended = self.frame_bounds.minimal_next_start_of(
545                    memcmp_encoding::decode_value(
546                        &self.frame_bounds.order_data_type,
547                        &appended_key.order_key,
548                        self.frame_bounds.order_type,
549                    )
550                    .expect("no reason to fail here because we just encoded it in memory"),
551                );
552                let minimal_next_start_enc_of_appended = memcmp_encoding::encode_value(
553                    minimal_next_start_of_appended,
554                    self.frame_bounds.order_type,
555                )
556                .expect("no reason to fail here");
557
558                if let Some(&mut LatestSession {
559                    ref start_idx,
560                    ref mut minimal_next_start,
561                }) = self.latest_session.as_mut()
562                {
563                    if &appended_key.order_key >= minimal_next_start {
564                        // the appended row starts a new session
565                        self.recognized_session_sizes
566                            .push_back(appended_idx - start_idx);
567                        self.latest_session = Some(LatestSession {
568                            start_idx: appended_idx,
569                            minimal_next_start: minimal_next_start_enc_of_appended,
570                        });
571                        // no need to update the current window because it's now corresponding
572                        // to some previous session
573                    } else {
574                        // the appended row belongs to the latest session
575                        *minimal_next_start = minimal_next_start_enc_of_appended;
576
577                        if *start_idx == *window.left_idx {
578                            // the current window is the latest session, we should extend it
579                            *window.right_excl_idx = appended_idx + 1;
580                        }
581                    }
582                } else {
583                    // no session yet, the current window should be empty
584                    let left_idx = *window.left_idx;
585                    let curr_idx = *window.curr_idx;
586                    let old_right_excl_idx = *window.right_excl_idx;
587                    assert_eq!(left_idx, curr_idx);
588                    assert_eq!(left_idx, old_right_excl_idx);
589                    assert_eq!(old_right_excl_idx, window.buffer.len() - 1);
590
591                    // now we put the first row into the current window
592                    *window.right_excl_idx = window.buffer.len();
593
594                    // and start to recognize the latest session
595                    self.latest_session = Some(LatestSession {
596                        start_idx: left_idx,
597                        minimal_next_start: minimal_next_start_enc_of_appended,
598                    });
599                }
600            }
601            RecalculateHint::Slide => {
602                let old_left_idx = *window.left_idx;
603                let new_curr_idx = *window.curr_idx;
604                let old_right_excl_idx = *window.right_excl_idx;
605
606                if new_curr_idx < old_right_excl_idx {
607                    // the current row is still in the current session window, no need to slide
608                } else {
609                    let old_session_size = self.recognized_session_sizes.pop_front();
610                    let next_session_size = self.recognized_session_sizes.front().copied();
611
612                    if let Some(old_session_size) = old_session_size {
613                        assert_eq!(old_session_size, old_right_excl_idx - old_left_idx);
614
615                        // slide the window to the next session
616                        if let Some(next_session_size) = next_session_size {
617                            // the next session is fully recognized, so we know the ending index
618                            *window.left_idx = old_right_excl_idx;
619                            *window.right_excl_idx = old_right_excl_idx + next_session_size;
620                        } else {
621                            // the next session is still in recognition, so we end the window at the end of buffer
622                            *window.left_idx = old_right_excl_idx;
623                            *window.right_excl_idx = window.buffer.len();
624                        }
625                    } else {
626                        // no recognized session yet, meaning the current window is the last session in the buffer
627                        assert_eq!(old_right_excl_idx, window.buffer.len());
628                        *window.left_idx = old_right_excl_idx;
629                        *window.right_excl_idx = old_right_excl_idx;
630                        self.latest_session = None;
631                    }
632                }
633            }
634        }
635    }
636
637    fn shift_indices(&mut self, n: usize) {
638        if let Some(LatestSession { start_idx, .. }) = self.latest_session.as_mut() {
639            *start_idx -= n;
640        }
641    }
642}
643
644#[cfg(test)]
645mod tests {
646    use itertools::Itertools;
647    use risingwave_common::row::OwnedRow;
648    use risingwave_common::types::{DataType, ScalarImpl};
649    use risingwave_common::util::sort_util::OrderType;
650    use risingwave_expr::window_function::FrameBound::{
651        CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding,
652    };
653    use risingwave_expr::window_function::SessionFrameGap;
654
655    use super::*;
656
657    #[test]
658    fn test_rows_frame_unbounded_preceding_to_current_row() {
659        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
660            RowsWindow::new(RowsFrameBounds {
661                start: UnboundedPreceding,
662                end: CurrentRow,
663            }),
664            FrameExclusion::NoOthers,
665            true,
666        );
667
668        let window = buffer.curr_window();
669        assert!(window.key.is_none());
670        assert!(!window.preceding_saturated);
671        assert!(!window.following_saturated);
672        buffer.append(1, "hello");
673        let window = buffer.curr_window();
674        assert_eq!(window.key, Some(&1));
675        assert!(!window.preceding_saturated); // unbounded preceding is never saturated
676        assert!(window.following_saturated);
677        buffer.append(2, "world");
678        let window = buffer.curr_window();
679        assert_eq!(window.key, Some(&1));
680        assert!(!window.preceding_saturated);
681        assert!(window.following_saturated);
682        assert_eq!(
683            buffer.curr_window_values().cloned().collect_vec(),
684            vec!["hello"]
685        );
686        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
687        assert!(removed_keys.is_empty()); // unbouded preceding, nothing can ever be removed
688        let window = buffer.curr_window();
689        assert_eq!(window.key, Some(&2));
690        assert!(!window.preceding_saturated);
691        assert!(window.following_saturated);
692        assert_eq!(buffer.smallest_key(), Some(&1));
693    }
694
695    #[test]
696    fn test_rows_frame_preceding_to_current_row() {
697        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
698            RowsWindow::new(RowsFrameBounds {
699                start: Preceding(1),
700                end: CurrentRow,
701            }),
702            FrameExclusion::NoOthers,
703            true,
704        );
705
706        let window = buffer.curr_window();
707        assert!(window.key.is_none());
708        assert!(!window.preceding_saturated);
709        assert!(!window.following_saturated);
710        buffer.append(1, "hello");
711        let window = buffer.curr_window();
712        assert_eq!(window.key, Some(&1));
713        assert!(!window.preceding_saturated);
714        assert!(window.following_saturated);
715        assert_eq!(
716            buffer.curr_window_values().cloned().collect_vec(),
717            vec!["hello"]
718        );
719        buffer.append(2, "world");
720        let window = buffer.curr_window();
721        assert_eq!(window.key, Some(&1));
722        assert!(!window.preceding_saturated);
723        assert!(window.following_saturated);
724        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
725        assert!(removed_keys.is_empty());
726        let window = buffer.curr_window();
727        assert_eq!(window.key, Some(&2));
728        assert!(window.preceding_saturated);
729        assert!(window.following_saturated);
730        assert_eq!(buffer.smallest_key(), Some(&1));
731        buffer.append(3, "!");
732        let window = buffer.curr_window();
733        assert_eq!(window.key, Some(&2));
734        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
735        assert_eq!(removed_keys, vec![1]);
736    }
737
738    #[test]
739    fn test_rows_frame_preceding_to_preceding() {
740        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
741            RowsWindow::new(RowsFrameBounds {
742                start: Preceding(2),
743                end: Preceding(1),
744            }),
745            FrameExclusion::NoOthers,
746            true,
747        );
748
749        buffer.append(1, "RisingWave");
750        let window = buffer.curr_window();
751        assert_eq!(window.key, Some(&1));
752        assert!(!window.preceding_saturated);
753        assert!(window.following_saturated);
754        assert!(buffer.curr_window_values().collect_vec().is_empty());
755        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
756        assert!(removed_keys.is_empty());
757        assert_eq!(buffer.smallest_key(), Some(&1));
758        buffer.append(2, "is the best");
759        let window = buffer.curr_window();
760        assert_eq!(window.key, Some(&2));
761        assert!(!window.preceding_saturated);
762        assert!(window.following_saturated);
763        assert_eq!(
764            buffer.curr_window_values().cloned().collect_vec(),
765            vec!["RisingWave"]
766        );
767        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
768        assert!(removed_keys.is_empty());
769        assert_eq!(buffer.smallest_key(), Some(&1));
770        buffer.append(3, "streaming platform");
771        let window = buffer.curr_window();
772        assert_eq!(window.key, Some(&3));
773        assert!(window.preceding_saturated);
774        assert!(window.following_saturated);
775        assert_eq!(
776            buffer.curr_window_values().cloned().collect_vec(),
777            vec!["RisingWave", "is the best"]
778        );
779        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
780        assert_eq!(removed_keys, vec![1]);
781        assert_eq!(buffer.smallest_key(), Some(&2));
782    }
783
784    #[test]
785    fn test_rows_frame_current_row_to_unbounded_following() {
786        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
787            RowsWindow::new(RowsFrameBounds {
788                start: CurrentRow,
789                end: UnboundedFollowing,
790            }),
791            FrameExclusion::NoOthers,
792            true,
793        );
794
795        buffer.append(1, "RisingWave");
796        let window = buffer.curr_window();
797        assert_eq!(window.key, Some(&1));
798        assert!(window.preceding_saturated);
799        assert!(!window.following_saturated);
800        assert_eq!(
801            buffer.curr_window_values().cloned().collect_vec(),
802            vec!["RisingWave"]
803        );
804        buffer.append(2, "is the best");
805        let window = buffer.curr_window();
806        assert_eq!(window.key, Some(&1));
807        assert!(window.preceding_saturated);
808        assert!(!window.following_saturated);
809        assert_eq!(
810            buffer.curr_window_values().cloned().collect_vec(),
811            vec!["RisingWave", "is the best"]
812        );
813        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
814        assert_eq!(removed_keys, vec![1]);
815        assert_eq!(buffer.smallest_key(), Some(&2));
816        let window = buffer.curr_window();
817        assert_eq!(window.key, Some(&2));
818        assert!(window.preceding_saturated);
819        assert!(!window.following_saturated);
820        assert_eq!(
821            buffer.curr_window_values().cloned().collect_vec(),
822            vec!["is the best"]
823        );
824    }
825
826    #[test]
827    fn test_rows_frame_current_row_to_following() {
828        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
829            RowsWindow::new(RowsFrameBounds {
830                start: CurrentRow,
831                end: Following(1),
832            }),
833            FrameExclusion::NoOthers,
834            true,
835        );
836
837        buffer.append(1, "RisingWave");
838        let window = buffer.curr_window();
839        assert_eq!(window.key, Some(&1));
840        assert!(window.preceding_saturated);
841        assert!(!window.following_saturated);
842        assert_eq!(
843            buffer.curr_window_values().cloned().collect_vec(),
844            vec!["RisingWave"]
845        );
846        buffer.append(2, "is the best");
847        let window = buffer.curr_window();
848        assert_eq!(window.key, Some(&1));
849        assert!(window.preceding_saturated);
850        assert!(window.following_saturated);
851        assert_eq!(
852            buffer.curr_window_values().cloned().collect_vec(),
853            vec!["RisingWave", "is the best"]
854        );
855        buffer.append(3, "streaming platform");
856        let window = buffer.curr_window();
857        assert_eq!(window.key, Some(&1));
858        assert!(window.preceding_saturated);
859        assert!(window.following_saturated);
860        assert_eq!(
861            buffer.curr_window_values().cloned().collect_vec(),
862            vec!["RisingWave", "is the best"]
863        );
864        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
865        assert_eq!(removed_keys, vec![1]);
866        let window = buffer.curr_window();
867        assert_eq!(window.key, Some(&2));
868        assert!(window.preceding_saturated);
869        assert!(window.following_saturated);
870        assert_eq!(
871            buffer.curr_window_values().cloned().collect_vec(),
872            vec!["is the best", "streaming platform"]
873        );
874    }
875
876    #[test]
877    fn test_rows_frame_following_to_following() {
878        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
879            RowsWindow::new(RowsFrameBounds {
880                start: Following(1),
881                end: Following(2),
882            }),
883            FrameExclusion::NoOthers,
884            true,
885        );
886
887        buffer.append(1, "RisingWave");
888        let window = buffer.curr_window();
889        assert_eq!(window.key, Some(&1));
890        assert!(window.preceding_saturated);
891        assert!(!window.following_saturated);
892        assert!(buffer.curr_window_values().collect_vec().is_empty());
893        buffer.append(2, "is the best");
894        let window = buffer.curr_window();
895        assert_eq!(window.key, Some(&1));
896        assert!(window.preceding_saturated);
897        assert!(!window.following_saturated);
898        assert_eq!(
899            buffer.curr_window_values().cloned().collect_vec(),
900            vec!["is the best"]
901        );
902        buffer.append(3, "streaming platform");
903        let window = buffer.curr_window();
904        assert_eq!(window.key, Some(&1));
905        assert!(window.preceding_saturated);
906        assert!(window.following_saturated);
907        assert_eq!(
908            buffer.curr_window_values().cloned().collect_vec(),
909            vec!["is the best", "streaming platform"]
910        );
911        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
912        assert_eq!(removed_keys, vec![1]);
913        let window = buffer.curr_window();
914        assert_eq!(window.key, Some(&2));
915        assert!(window.preceding_saturated);
916        assert!(!window.following_saturated);
917        assert_eq!(
918            buffer.curr_window_values().cloned().collect_vec(),
919            vec!["streaming platform"]
920        );
921    }
922
923    #[test]
924    fn test_rows_frame_exclude_current_row() {
925        let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
926            RowsWindow::new(RowsFrameBounds {
927                start: UnboundedPreceding,
928                end: CurrentRow,
929            }),
930            FrameExclusion::CurrentRow,
931            false,
932        );
933
934        buffer.append(1, "hello");
935        assert!(
936            buffer
937                .curr_window_values()
938                .cloned()
939                .collect_vec()
940                .is_empty()
941        );
942        buffer.append(2, "world");
943        let _ = buffer.slide();
944        assert_eq!(
945            buffer.curr_window_values().cloned().collect_vec(),
946            vec!["hello"]
947        );
948    }
949
950    #[test]
951    fn test_session_frame() {
952        let order_data_type = DataType::Int64;
953        let order_type = OrderType::ascending();
954        let gap_data_type = DataType::Int64;
955
956        let mut buffer = WindowBuffer::<SessionWindow<_>>::new(
957            SessionWindow::new(SessionFrameBounds {
958                order_data_type: order_data_type.clone(),
959                order_type,
960                gap_data_type: gap_data_type.clone(),
961                gap: SessionFrameGap::new_for_test(
962                    ScalarImpl::Int64(5),
963                    &order_data_type,
964                    &gap_data_type,
965                ),
966            }),
967            FrameExclusion::NoOthers,
968            true,
969        );
970
971        let key = |key: i64| -> StateKey {
972            StateKey {
973                order_key: memcmp_encoding::encode_value(Some(ScalarImpl::from(key)), order_type)
974                    .unwrap(),
975                pk: OwnedRow::empty().into(),
976            }
977        };
978
979        assert!(buffer.curr_key().is_none());
980
981        buffer.append(key(1), "hello");
982        buffer.append(key(3), "session");
983        let window = buffer.curr_window();
984        assert_eq!(window.key, Some(&key(1)));
985        assert!(window.preceding_saturated);
986        assert!(!window.following_saturated);
987        assert_eq!(
988            buffer.curr_window_values().cloned().collect_vec(),
989            vec!["hello", "session"]
990        );
991
992        buffer.append(key(8), "window"); // start a new session
993        let window = buffer.curr_window();
994        assert!(window.following_saturated);
995        assert_eq!(
996            buffer.curr_window_values().cloned().collect_vec(),
997            vec!["hello", "session"]
998        );
999
1000        buffer.append(key(15), "and");
1001        buffer.append(key(16), "world");
1002        assert_eq!(
1003            buffer.curr_window_values().cloned().collect_vec(),
1004            vec!["hello", "session"]
1005        );
1006
1007        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1008        assert!(removed_keys.is_empty());
1009        let window = buffer.curr_window();
1010        assert_eq!(window.key, Some(&key(3)));
1011        assert!(window.preceding_saturated);
1012        assert!(window.following_saturated);
1013        assert_eq!(
1014            buffer.curr_window_values().cloned().collect_vec(),
1015            vec!["hello", "session"]
1016        );
1017
1018        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1019        assert_eq!(removed_keys, vec![key(1), key(3)]);
1020        assert_eq!(buffer.smallest_key(), Some(&key(8)));
1021        let window = buffer.curr_window();
1022        assert_eq!(window.key, Some(&key(8)));
1023        assert!(window.preceding_saturated);
1024        assert!(window.following_saturated);
1025        assert_eq!(
1026            buffer.curr_window_values().cloned().collect_vec(),
1027            vec!["window"]
1028        );
1029
1030        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1031        assert_eq!(removed_keys, vec![key(8)]);
1032        assert_eq!(buffer.smallest_key(), Some(&key(15)));
1033        let window = buffer.curr_window();
1034        assert_eq!(window.key, Some(&key(15)));
1035        assert!(window.preceding_saturated);
1036        assert!(!window.following_saturated);
1037        assert_eq!(
1038            buffer.curr_window_values().cloned().collect_vec(),
1039            vec!["and", "world"]
1040        );
1041
1042        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1043        assert!(removed_keys.is_empty());
1044        assert_eq!(buffer.curr_key(), Some(&key(16)));
1045        assert_eq!(
1046            buffer.curr_window_values().cloned().collect_vec(),
1047            vec!["and", "world"]
1048        );
1049
1050        let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec();
1051        assert_eq!(removed_keys, vec![key(15), key(16)]);
1052        assert!(buffer.curr_key().is_none());
1053        assert!(
1054            buffer
1055                .curr_window_values()
1056                .cloned()
1057                .collect_vec()
1058                .is_empty()
1059        );
1060    }
1061}