risingwave_expr/window_function/
states.rs1use std::ops::{Deref, DerefMut};
16
17use itertools::Itertools;
18use risingwave_common::types::Datum;
19
20use super::state::{StateEvictHint, StateKey, WindowState};
21use crate::Result;
22
23pub struct WindowStates(Vec<Box<dyn WindowState + Send + Sync>>);
24
25impl WindowStates {
26 pub fn new(states: Vec<Box<dyn WindowState + Send + Sync>>) -> Self {
27 assert!(!states.is_empty());
28 Self(states)
29 }
30
31 pub fn are_aligned(&self) -> bool {
33 self.0
34 .iter()
35 .map(|state| state.curr_window().key)
36 .all_equal()
37 }
38
39 pub fn curr_key(&self) -> Option<&StateKey> {
41 debug_assert!(self.are_aligned());
42 self.0.first().and_then(|state| state.curr_window().key)
43 }
44
45 pub fn are_ready(&self) -> bool {
47 debug_assert!(self.are_aligned());
48 self.0.iter().all(|state| state.curr_window().is_ready)
49 }
50
51 pub fn slide(&mut self) -> Result<(Vec<Datum>, StateEvictHint)> {
53 debug_assert!(self.are_aligned());
54 let mut output = Vec::with_capacity(self.0.len());
55 let mut evict_hint: Option<StateEvictHint> = None;
56 for state in &mut self.0 {
57 let (x_output, x_evict) = state.slide()?;
58 output.push(x_output);
59 evict_hint = match evict_hint {
60 Some(evict_hint) => Some(evict_hint.merge(x_evict)),
61 None => Some(x_evict),
62 };
63 }
64 Ok((
65 output,
66 evict_hint.expect("# of evict hints = # of window states"),
67 ))
68 }
69
70 pub fn slide_no_evict_hint(&mut self) -> Result<Vec<Datum>> {
72 debug_assert!(self.are_aligned());
73 let mut output = Vec::with_capacity(self.0.len());
74 for state in &mut self.0 {
75 let (x_output, _) = state.slide()?;
76 output.push(x_output);
77 }
78 Ok(output)
79 }
80
81 pub fn just_slide(&mut self) -> Result<()> {
83 debug_assert!(self.are_aligned());
84 for state in &mut self.0 {
85 state.slide_no_output()?;
86 }
87 Ok(())
88 }
89
90 pub fn just_slide_to(&mut self, curr_key: &StateKey) -> Result<()> {
94 while self.curr_key() != Some(curr_key) {
96 self.just_slide()?;
97 }
98 Ok(())
99 }
100}
101
102impl Deref for WindowStates {
103 type Target = Vec<Box<dyn WindowState + Send + Sync>>;
104
105 fn deref(&self) -> &Self::Target {
106 &self.0
107 }
108}
109
110impl DerefMut for WindowStates {
111 fn deref_mut(&mut self) -> &mut Self::Target {
112 &mut self.0
113 }
114}