risingwave_expr/window_function/states.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::{Deref, DerefMut};
use itertools::Itertools;
use risingwave_common::types::Datum;
use super::state::{StateEvictHint, StateKey, WindowState};
use crate::Result;
pub struct WindowStates(Vec<Box<dyn WindowState + Send + Sync>>);
impl WindowStates {
pub fn new(states: Vec<Box<dyn WindowState + Send + Sync>>) -> Self {
assert!(!states.is_empty());
Self(states)
}
/// Check if all windows are aligned.
pub fn are_aligned(&self) -> bool {
self.0
.iter()
.map(|state| state.curr_window().key)
.all_equal()
}
/// Get the key of current windows.
pub fn curr_key(&self) -> Option<&StateKey> {
debug_assert!(self.are_aligned());
self.0.first().and_then(|state| state.curr_window().key)
}
/// Check is all windows are ready.
pub fn are_ready(&self) -> bool {
debug_assert!(self.are_aligned());
self.0.iter().all(|state| state.curr_window().is_ready)
}
/// Slide all windows forward and collect the output and evict hints.
pub fn slide(&mut self) -> Result<(Vec<Datum>, StateEvictHint)> {
debug_assert!(self.are_aligned());
let mut output = Vec::with_capacity(self.0.len());
let mut evict_hint: Option<StateEvictHint> = None;
for state in &mut self.0 {
let (x_output, x_evict) = state.slide()?;
output.push(x_output);
evict_hint = match evict_hint {
Some(evict_hint) => Some(evict_hint.merge(x_evict)),
None => Some(x_evict),
};
}
Ok((
output,
evict_hint.expect("# of evict hints = # of window states"),
))
}
/// Slide all windows forward and collect the output, ignoring the evict hints.
pub fn slide_no_evict_hint(&mut self) -> Result<Vec<Datum>> {
debug_assert!(self.are_aligned());
let mut output = Vec::with_capacity(self.0.len());
for state in &mut self.0 {
let (x_output, _) = state.slide()?;
output.push(x_output);
}
Ok(output)
}
/// Slide all windows forward, ignoring the output and evict hints.
pub fn just_slide(&mut self) -> Result<()> {
debug_assert!(self.are_aligned());
for state in &mut self.0 {
state.slide_no_output()?;
}
Ok(())
}
/// Slide all windows forward, until the current key is `curr_key`, ignoring the output and evict hints.
/// After this method, `self.curr_key() == Some(curr_key)`.
/// `curr_key` must exist in the `WindowStates`.
pub fn just_slide_to(&mut self, curr_key: &StateKey) -> Result<()> {
// TODO(rc): with the knowledge of the old output, we can "jump" to the `curr_key` directly for some window function kind
while self.curr_key() != Some(curr_key) {
self.just_slide()?;
}
Ok(())
}
}
impl Deref for WindowStates {
type Target = Vec<Box<dyn WindowState + Send + Sync>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for WindowStates {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}