risingwave_expr/window_function/state.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16
17use itertools::Itertools;
18use risingwave_common::row::OwnedRow;
19use risingwave_common::types::{Datum, DefaultOrdered};
20use risingwave_common::util::memcmp_encoding::MemcmpEncoded;
21use risingwave_common_estimate_size::EstimateSize;
22use smallvec::SmallVec;
23
24use super::WindowFuncCall;
25use crate::{ExprError, Result};
26
27/// Unique and ordered identifier for a row in internal states.
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, EstimateSize)]
29pub struct StateKey {
30 pub order_key: MemcmpEncoded,
31 pub pk: DefaultOrdered<OwnedRow>,
32}
33
34#[derive(Debug)]
35pub struct StatePos<'a> {
36 /// Only 2 cases in which the `key` is `None`:
37 /// 1. The state is empty.
38 /// 2. It's a pure preceding window, and all ready outputs are consumed.
39 pub key: Option<&'a StateKey>,
40 pub is_ready: bool,
41}
42
43#[derive(Debug)]
44pub enum StateEvictHint {
45 /// Use a set instead of a single key to avoid state table iter or too many range delete.
46 /// Shouldn't be empty set.
47 CanEvict(BTreeSet<StateKey>),
48 /// State keys from the specified key are still required, so must be kept in the state table.
49 CannotEvict(StateKey),
50}
51
52impl StateEvictHint {
53 pub fn merge(self, other: StateEvictHint) -> StateEvictHint {
54 use StateEvictHint::*;
55 match (self, other) {
56 (CanEvict(a), CanEvict(b)) => {
57 // Example:
58 // a = CanEvict({1, 2, 3})
59 // b = CanEvict({2, 3, 4})
60 // a.merge(b) = CanEvict({1, 2, 3})
61 let a_last = a.last().unwrap();
62 let b_last = b.last().unwrap();
63 let last = std::cmp::min(a_last, b_last).clone();
64 CanEvict(
65 a.into_iter()
66 .take_while(|k| k <= &last)
67 .chain(b.into_iter().take_while(|k| k <= &last))
68 .collect(),
69 )
70 }
71 (CannotEvict(a), CannotEvict(b)) => {
72 // Example:
73 // a = CannotEvict(2), meaning keys < 2 can be evicted
74 // b = CannotEvict(3), meaning keys < 3 can be evicted
75 // a.merge(b) = CannotEvict(2)
76 CannotEvict(std::cmp::min(a, b))
77 }
78 (CanEvict(mut keys), CannotEvict(still_required))
79 | (CannotEvict(still_required), CanEvict(mut keys)) => {
80 // Example:
81 // a = CanEvict({1, 2, 3})
82 // b = CannotEvict(3)
83 // a.merge(b) = CanEvict({1, 2})
84 keys.split_off(&still_required);
85 CanEvict(keys)
86 }
87 }
88 }
89}
90
91pub trait WindowState: EstimateSize {
92 // TODO(rc): may append rows in batch like in `hash_agg`.
93 /// Append a new input row to the state. The `key` is expected to be increasing.
94 fn append(&mut self, key: StateKey, args: SmallVec<[Datum; 2]>);
95
96 /// Get the current window frame position.
97 fn curr_window(&self) -> StatePos<'_>;
98
99 /// Slide the window frame forward and collect the output and evict hint. Similar to `Iterator::next`.
100 fn slide(&mut self) -> Result<(Datum, StateEvictHint)>;
101
102 /// Slide the window frame forward and collect the evict hint. Don't calculate the output if possible.
103 fn slide_no_output(&mut self) -> Result<StateEvictHint>;
104}
105
106pub type BoxedWindowState = Box<dyn WindowState + Send + Sync>;
107
108#[linkme::distributed_slice]
109pub static WINDOW_STATE_BUILDERS: [fn(&WindowFuncCall) -> Result<BoxedWindowState>];
110
111pub fn create_window_state(call: &WindowFuncCall) -> Result<BoxedWindowState> {
112 // we expect only one builder function in `expr_impl/window_function/mod.rs`
113 let builder = WINDOW_STATE_BUILDERS.iter().next();
114 builder.map_or_else(
115 || {
116 Err(ExprError::UnsupportedFunction(format!(
117 "{}({}) -> {}",
118 call.kind,
119 call.args.arg_types().iter().format(", "),
120 &call.return_type,
121 )))
122 },
123 |f| f(call),
124 )
125}