risingwave_expr/window_function/
state.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16
17use itertools::Itertools;
18use risingwave_common::row::OwnedRow;
19use risingwave_common::types::{Datum, DefaultOrdered};
20use risingwave_common::util::memcmp_encoding::MemcmpEncoded;
21use risingwave_common_estimate_size::EstimateSize;
22use smallvec::SmallVec;
23
24use super::WindowFuncCall;
25use crate::{ExprError, Result};
26
27/// Unique and ordered identifier for a row in internal states.
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, EstimateSize)]
29pub struct StateKey {
30    pub order_key: MemcmpEncoded,
31    pub pk: DefaultOrdered<OwnedRow>,
32}
33
34#[derive(Debug)]
35pub struct StatePos<'a> {
36    /// Only 2 cases in which the `key` is `None`:
37    /// 1. The state is empty.
38    /// 2. It's a pure preceding window, and all ready outputs are consumed.
39    pub key: Option<&'a StateKey>,
40    pub is_ready: bool,
41}
42
43#[derive(Debug)]
44pub enum StateEvictHint {
45    /// Use a set instead of a single key to avoid state table iter or too many range delete.
46    /// Shouldn't be empty set.
47    CanEvict(BTreeSet<StateKey>),
48    /// State keys from the specified key are still required, so must be kept in the state table.
49    CannotEvict(StateKey),
50}
51
52impl StateEvictHint {
53    pub fn merge(self, other: StateEvictHint) -> StateEvictHint {
54        use StateEvictHint::*;
55        match (self, other) {
56            (CanEvict(a), CanEvict(b)) => {
57                // Example:
58                // a = CanEvict({1, 2, 3})
59                // b = CanEvict({2, 3, 4})
60                // a.merge(b) = CanEvict({1, 2, 3})
61                let a_last = a.last().unwrap();
62                let b_last = b.last().unwrap();
63                let last = std::cmp::min(a_last, b_last).clone();
64                CanEvict(
65                    a.into_iter()
66                        .take_while(|k| k <= &last)
67                        .chain(b.into_iter().take_while(|k| k <= &last))
68                        .collect(),
69                )
70            }
71            (CannotEvict(a), CannotEvict(b)) => {
72                // Example:
73                // a = CannotEvict(2), meaning keys < 2 can be evicted
74                // b = CannotEvict(3), meaning keys < 3 can be evicted
75                // a.merge(b) = CannotEvict(2)
76                CannotEvict(std::cmp::min(a, b))
77            }
78            (CanEvict(mut keys), CannotEvict(still_required))
79            | (CannotEvict(still_required), CanEvict(mut keys)) => {
80                // Example:
81                // a = CanEvict({1, 2, 3})
82                // b = CannotEvict(3)
83                // a.merge(b) = CanEvict({1, 2})
84                keys.split_off(&still_required);
85                if keys.is_empty() {
86                    CannotEvict(still_required)
87                } else {
88                    CanEvict(keys)
89                }
90            }
91        }
92    }
93}
94
95/// Snapshot of a window state for persistence and recovery.
96#[derive(Debug, Clone)]
97pub struct WindowStateSnapshot {
98    /// The key of the last output row.
99    pub last_output_key: Option<StateKey>,
100    /// Function-specific state. The variant must match the window function type;
101    /// a mismatch is detected at restore time and returned as an error.
102    pub function_state: risingwave_pb::window_function::window_state_snapshot::FunctionState,
103}
104
105pub trait WindowState: EstimateSize {
106    // TODO(rc): may append rows in batch like in `hash_agg`.
107    /// Append a new input row to the state. The `key` is expected to be increasing.
108    fn append(&mut self, key: StateKey, args: SmallVec<[Datum; 2]>);
109
110    /// Get the current window frame position.
111    fn curr_window(&self) -> StatePos<'_>;
112
113    /// Slide the window frame forward and collect the output and evict hint. Similar to `Iterator::next`.
114    fn slide(&mut self) -> Result<(Datum, StateEvictHint)>;
115
116    /// Slide the window frame forward and collect the evict hint. Don't calculate the output if possible.
117    fn slide_no_output(&mut self) -> Result<StateEvictHint>;
118
119    /// Enable persistence for this window state. When enabled, the state may return
120    /// `CanEvict` hints and should support snapshot/restore.
121    fn enable_persistence(&mut self) {}
122
123    /// Create a snapshot of the current state for persistence.
124    /// Returns `None` if the state doesn't support persistence.
125    fn snapshot(&self) -> Option<WindowStateSnapshot> {
126        None
127    }
128
129    /// Restore the state from a snapshot. Called during recovery before replaying rows.
130    fn restore(&mut self, _snapshot: WindowStateSnapshot) -> Result<()> {
131        Ok(())
132    }
133}
134
135pub type BoxedWindowState = Box<dyn WindowState + Send + Sync>;
136
137#[linkme::distributed_slice]
138pub static WINDOW_STATE_BUILDERS: [fn(&WindowFuncCall) -> Result<BoxedWindowState>];
139
140pub fn create_window_state(call: &WindowFuncCall) -> Result<BoxedWindowState> {
141    // we expect only one builder function in `expr_impl/window_function/mod.rs`
142    let builder = WINDOW_STATE_BUILDERS.iter().next();
143    builder.map_or_else(
144        || {
145            Err(ExprError::UnsupportedFunction(format!(
146                "{}({}) -> {}",
147                call.kind,
148                call.args.arg_types().iter().format(", "),
149                &call.return_type,
150            )))
151        },
152        |f| f(call),
153    )
154}
155
156/// Test that merging evict hints from mixed persistent-numbering and non-numbering window
157/// functions does not panic. This reproduces the scenario where e.g. `row_number()`, `lag()`,
158/// `rank()` are used together in an EOWC query:
159///
160///   1. `row_number` (persistent) -> `CanEvict({key})`
161///   2. `lag` (non-persistent)    -> `CannotEvict(key)`
162///      merge(1, 2) = `CanEvict({})` -- empty set, violates the "shouldn't be empty" invariant
163///   3. `rank` (persistent)       -> `CanEvict({key})`
164///      merge(empty, 3) panics on `.last().unwrap()`
165#[cfg(test)]
166mod tests {
167    use super::*;
168
169    fn make_state_key(order: i64, pk: i64) -> StateKey {
170        use risingwave_common::row::OwnedRow;
171        use risingwave_common::types::ScalarImpl;
172        StateKey {
173            order_key: MemcmpEncoded::from(order.to_be_bytes().to_vec()),
174            pk: OwnedRow::new(vec![Some(ScalarImpl::Int64(pk))]).into(),
175        }
176    }
177
178    /// Simulates `row_number()` -> `CanEvict({K})`, `lag()` -> `CannotEvict(K)`,
179    /// `rank()` -> `CanEvict({K})`.
180    /// The two-way merge must fall back to `CannotEvict` (not an empty `CanEvict`),
181    /// so the three-way merge doesn't panic on `.last().unwrap()`.
182    #[test]
183    fn test_merge_mixed_persistent_and_non_persistent() {
184        let key = make_state_key(1, 100);
185
186        // Two-way: CanEvict({K}) + CannotEvict(K) -> CannotEvict(K)
187        let merged = StateEvictHint::CanEvict(BTreeSet::from([key.clone()]))
188            .merge(StateEvictHint::CannotEvict(key.clone()));
189        assert!(matches!(&merged, StateEvictHint::CannotEvict(k) if k == &key));
190
191        // Three-way: CannotEvict(K) + CanEvict({K}) -> CannotEvict(K)
192        let merged = merged.merge(StateEvictHint::CanEvict(BTreeSet::from([key.clone()])));
193        assert!(matches!(&merged, StateEvictHint::CannotEvict(k) if k == &key));
194    }
195}