risingwave_expr/window_function/state.rs
1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16
17use itertools::Itertools;
18use risingwave_common::row::OwnedRow;
19use risingwave_common::types::{Datum, DefaultOrdered};
20use risingwave_common::util::memcmp_encoding::MemcmpEncoded;
21use risingwave_common_estimate_size::EstimateSize;
22use smallvec::SmallVec;
23
24use super::WindowFuncCall;
25use crate::{ExprError, Result};
26
27/// Unique and ordered identifier for a row in internal states.
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, EstimateSize)]
29pub struct StateKey {
30 pub order_key: MemcmpEncoded,
31 pub pk: DefaultOrdered<OwnedRow>,
32}
33
34#[derive(Debug)]
35pub struct StatePos<'a> {
36 /// Only 2 cases in which the `key` is `None`:
37 /// 1. The state is empty.
38 /// 2. It's a pure preceding window, and all ready outputs are consumed.
39 pub key: Option<&'a StateKey>,
40 pub is_ready: bool,
41}
42
43#[derive(Debug)]
44pub enum StateEvictHint {
45 /// Use a set instead of a single key to avoid state table iter or too many range delete.
46 /// Shouldn't be empty set.
47 CanEvict(BTreeSet<StateKey>),
48 /// State keys from the specified key are still required, so must be kept in the state table.
49 CannotEvict(StateKey),
50}
51
52impl StateEvictHint {
53 pub fn merge(self, other: StateEvictHint) -> StateEvictHint {
54 use StateEvictHint::*;
55 match (self, other) {
56 (CanEvict(a), CanEvict(b)) => {
57 // Example:
58 // a = CanEvict({1, 2, 3})
59 // b = CanEvict({2, 3, 4})
60 // a.merge(b) = CanEvict({1, 2, 3})
61 let a_last = a.last().unwrap();
62 let b_last = b.last().unwrap();
63 let last = std::cmp::min(a_last, b_last).clone();
64 CanEvict(
65 a.into_iter()
66 .take_while(|k| k <= &last)
67 .chain(b.into_iter().take_while(|k| k <= &last))
68 .collect(),
69 )
70 }
71 (CannotEvict(a), CannotEvict(b)) => {
72 // Example:
73 // a = CannotEvict(2), meaning keys < 2 can be evicted
74 // b = CannotEvict(3), meaning keys < 3 can be evicted
75 // a.merge(b) = CannotEvict(2)
76 CannotEvict(std::cmp::min(a, b))
77 }
78 (CanEvict(mut keys), CannotEvict(still_required))
79 | (CannotEvict(still_required), CanEvict(mut keys)) => {
80 // Example:
81 // a = CanEvict({1, 2, 3})
82 // b = CannotEvict(3)
83 // a.merge(b) = CanEvict({1, 2})
84 keys.split_off(&still_required);
85 if keys.is_empty() {
86 CannotEvict(still_required)
87 } else {
88 CanEvict(keys)
89 }
90 }
91 }
92 }
93}
94
95/// Snapshot of a window state for persistence and recovery.
96#[derive(Debug, Clone)]
97pub struct WindowStateSnapshot {
98 /// The key of the last output row.
99 pub last_output_key: Option<StateKey>,
100 /// Function-specific state. The variant must match the window function type;
101 /// a mismatch is detected at restore time and returned as an error.
102 pub function_state: risingwave_pb::window_function::window_state_snapshot::FunctionState,
103}
104
105pub trait WindowState: EstimateSize {
106 // TODO(rc): may append rows in batch like in `hash_agg`.
107 /// Append a new input row to the state. The `key` is expected to be increasing.
108 fn append(&mut self, key: StateKey, args: SmallVec<[Datum; 2]>);
109
110 /// Get the current window frame position.
111 fn curr_window(&self) -> StatePos<'_>;
112
113 /// Slide the window frame forward and collect the output and evict hint. Similar to `Iterator::next`.
114 fn slide(&mut self) -> Result<(Datum, StateEvictHint)>;
115
116 /// Slide the window frame forward and collect the evict hint. Don't calculate the output if possible.
117 fn slide_no_output(&mut self) -> Result<StateEvictHint>;
118
119 /// Enable persistence for this window state. When enabled, the state may return
120 /// `CanEvict` hints and should support snapshot/restore.
121 fn enable_persistence(&mut self) {}
122
123 /// Create a snapshot of the current state for persistence.
124 /// Returns `None` if the state doesn't support persistence.
125 fn snapshot(&self) -> Option<WindowStateSnapshot> {
126 None
127 }
128
129 /// Restore the state from a snapshot. Called during recovery before replaying rows.
130 fn restore(&mut self, _snapshot: WindowStateSnapshot) -> Result<()> {
131 Ok(())
132 }
133}
134
135pub type BoxedWindowState = Box<dyn WindowState + Send + Sync>;
136
137#[linkme::distributed_slice]
138pub static WINDOW_STATE_BUILDERS: [fn(&WindowFuncCall) -> Result<BoxedWindowState>];
139
140pub fn create_window_state(call: &WindowFuncCall) -> Result<BoxedWindowState> {
141 // we expect only one builder function in `expr_impl/window_function/mod.rs`
142 let builder = WINDOW_STATE_BUILDERS.iter().next();
143 builder.map_or_else(
144 || {
145 Err(ExprError::UnsupportedFunction(format!(
146 "{}({}) -> {}",
147 call.kind,
148 call.args.arg_types().iter().format(", "),
149 &call.return_type,
150 )))
151 },
152 |f| f(call),
153 )
154}
155
156/// Test that merging evict hints from mixed persistent-numbering and non-numbering window
157/// functions does not panic. This reproduces the scenario where e.g. `row_number()`, `lag()`,
158/// `rank()` are used together in an EOWC query:
159///
160/// 1. `row_number` (persistent) -> `CanEvict({key})`
161/// 2. `lag` (non-persistent) -> `CannotEvict(key)`
162/// merge(1, 2) = `CanEvict({})` -- empty set, violates the "shouldn't be empty" invariant
163/// 3. `rank` (persistent) -> `CanEvict({key})`
164/// merge(empty, 3) panics on `.last().unwrap()`
165#[cfg(test)]
166mod tests {
167 use super::*;
168
169 fn make_state_key(order: i64, pk: i64) -> StateKey {
170 use risingwave_common::row::OwnedRow;
171 use risingwave_common::types::ScalarImpl;
172 StateKey {
173 order_key: MemcmpEncoded::from(order.to_be_bytes().to_vec()),
174 pk: OwnedRow::new(vec![Some(ScalarImpl::Int64(pk))]).into(),
175 }
176 }
177
178 /// Simulates `row_number()` -> `CanEvict({K})`, `lag()` -> `CannotEvict(K)`,
179 /// `rank()` -> `CanEvict({K})`.
180 /// The two-way merge must fall back to `CannotEvict` (not an empty `CanEvict`),
181 /// so the three-way merge doesn't panic on `.last().unwrap()`.
182 #[test]
183 fn test_merge_mixed_persistent_and_non_persistent() {
184 let key = make_state_key(1, 100);
185
186 // Two-way: CanEvict({K}) + CannotEvict(K) -> CannotEvict(K)
187 let merged = StateEvictHint::CanEvict(BTreeSet::from([key.clone()]))
188 .merge(StateEvictHint::CannotEvict(key.clone()));
189 assert!(matches!(&merged, StateEvictHint::CannotEvict(k) if k == &key));
190
191 // Three-way: CannotEvict(K) + CanEvict({K}) -> CannotEvict(K)
192 let merged = merged.merge(StateEvictHint::CanEvict(BTreeSet::from([key.clone()])));
193 assert!(matches!(&merged, StateEvictHint::CannotEvict(k) if k == &key));
194 }
195}