risingwave_expr/window_function/
states.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::{Deref, DerefMut};
16
17use itertools::Itertools;
18use risingwave_common::types::Datum;
19
20use super::state::{StateEvictHint, StateKey, WindowState};
21use crate::Result;
22
23pub struct WindowStates(Vec<Box<dyn WindowState + Send + Sync>>);
24
25impl WindowStates {
26    pub fn new(states: Vec<Box<dyn WindowState + Send + Sync>>) -> Self {
27        assert!(!states.is_empty());
28        Self(states)
29    }
30
31    /// Check if all windows are aligned.
32    pub fn are_aligned(&self) -> bool {
33        self.0
34            .iter()
35            .map(|state| state.curr_window().key)
36            .all_equal()
37    }
38
39    /// Get the key of current windows.
40    pub fn curr_key(&self) -> Option<&StateKey> {
41        debug_assert!(self.are_aligned());
42        self.0.first().and_then(|state| state.curr_window().key)
43    }
44
45    /// Check is all windows are ready.
46    pub fn are_ready(&self) -> bool {
47        debug_assert!(self.are_aligned());
48        self.0.iter().all(|state| state.curr_window().is_ready)
49    }
50
51    /// Slide all windows forward and collect the output and evict hints.
52    pub fn slide(&mut self) -> Result<(Vec<Datum>, StateEvictHint)> {
53        debug_assert!(self.are_aligned());
54        let mut output = Vec::with_capacity(self.0.len());
55        let mut evict_hint: Option<StateEvictHint> = None;
56        for state in &mut self.0 {
57            let (x_output, x_evict) = state.slide()?;
58            output.push(x_output);
59            evict_hint = match evict_hint {
60                Some(evict_hint) => Some(evict_hint.merge(x_evict)),
61                None => Some(x_evict),
62            };
63        }
64        Ok((
65            output,
66            evict_hint.expect("# of evict hints = # of window states"),
67        ))
68    }
69
70    /// Slide all windows forward and collect the output, ignoring the evict hints.
71    pub fn slide_no_evict_hint(&mut self) -> Result<Vec<Datum>> {
72        debug_assert!(self.are_aligned());
73        let mut output = Vec::with_capacity(self.0.len());
74        for state in &mut self.0 {
75            let (x_output, _) = state.slide()?;
76            output.push(x_output);
77        }
78        Ok(output)
79    }
80
81    /// Slide all windows forward, ignoring the output and evict hints.
82    pub fn just_slide(&mut self) -> Result<()> {
83        debug_assert!(self.are_aligned());
84        for state in &mut self.0 {
85            state.slide_no_output()?;
86        }
87        Ok(())
88    }
89
90    /// Slide all windows forward, until the current key is `curr_key`, ignoring the output and evict hints.
91    /// After this method, `self.curr_key() == Some(curr_key)`.
92    /// `curr_key` must exist in the `WindowStates`.
93    pub fn just_slide_to(&mut self, curr_key: &StateKey) -> Result<()> {
94        // TODO(rc): with the knowledge of the old output, we can "jump" to the `curr_key` directly for some window function kind
95        while self.curr_key() != Some(curr_key) {
96            self.just_slide()?;
97        }
98        Ok(())
99    }
100}
101
102impl Deref for WindowStates {
103    type Target = Vec<Box<dyn WindowState + Send + Sync>>;
104
105    fn deref(&self) -> &Self::Target {
106        &self.0
107    }
108}
109
110impl DerefMut for WindowStates {
111    fn deref_mut(&mut self) -> &mut Self::Target {
112        &mut self.0
113    }
114}