risingwave_expr/window_function/
state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16
17use itertools::Itertools;
18use risingwave_common::row::OwnedRow;
19use risingwave_common::types::{Datum, DefaultOrdered};
20use risingwave_common::util::memcmp_encoding::MemcmpEncoded;
21use risingwave_common_estimate_size::EstimateSize;
22use smallvec::SmallVec;
23
24use super::WindowFuncCall;
25use crate::{ExprError, Result};
26
27/// Unique and ordered identifier for a row in internal states.
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, EstimateSize)]
29pub struct StateKey {
30    pub order_key: MemcmpEncoded,
31    pub pk: DefaultOrdered<OwnedRow>,
32}
33
34#[derive(Debug)]
35pub struct StatePos<'a> {
36    /// Only 2 cases in which the `key` is `None`:
37    /// 1. The state is empty.
38    /// 2. It's a pure preceding window, and all ready outputs are consumed.
39    pub key: Option<&'a StateKey>,
40    pub is_ready: bool,
41}
42
43#[derive(Debug)]
44pub enum StateEvictHint {
45    /// Use a set instead of a single key to avoid state table iter or too many range delete.
46    /// Shouldn't be empty set.
47    CanEvict(BTreeSet<StateKey>),
48    /// State keys from the specified key are still required, so must be kept in the state table.
49    CannotEvict(StateKey),
50}
51
52impl StateEvictHint {
53    pub fn merge(self, other: StateEvictHint) -> StateEvictHint {
54        use StateEvictHint::*;
55        match (self, other) {
56            (CanEvict(a), CanEvict(b)) => {
57                // Example:
58                // a = CanEvict({1, 2, 3})
59                // b = CanEvict({2, 3, 4})
60                // a.merge(b) = CanEvict({1, 2, 3})
61                let a_last = a.last().unwrap();
62                let b_last = b.last().unwrap();
63                let last = std::cmp::min(a_last, b_last).clone();
64                CanEvict(
65                    a.into_iter()
66                        .take_while(|k| k <= &last)
67                        .chain(b.into_iter().take_while(|k| k <= &last))
68                        .collect(),
69                )
70            }
71            (CannotEvict(a), CannotEvict(b)) => {
72                // Example:
73                // a = CannotEvict(2), meaning keys < 2 can be evicted
74                // b = CannotEvict(3), meaning keys < 3 can be evicted
75                // a.merge(b) = CannotEvict(2)
76                CannotEvict(std::cmp::min(a, b))
77            }
78            (CanEvict(mut keys), CannotEvict(still_required))
79            | (CannotEvict(still_required), CanEvict(mut keys)) => {
80                // Example:
81                // a = CanEvict({1, 2, 3})
82                // b = CannotEvict(3)
83                // a.merge(b) = CanEvict({1, 2})
84                keys.split_off(&still_required);
85                CanEvict(keys)
86            }
87        }
88    }
89}
90
91pub trait WindowState: EstimateSize {
92    // TODO(rc): may append rows in batch like in `hash_agg`.
93    /// Append a new input row to the state. The `key` is expected to be increasing.
94    fn append(&mut self, key: StateKey, args: SmallVec<[Datum; 2]>);
95
96    /// Get the current window frame position.
97    fn curr_window(&self) -> StatePos<'_>;
98
99    /// Slide the window frame forward and collect the output and evict hint. Similar to `Iterator::next`.
100    fn slide(&mut self) -> Result<(Datum, StateEvictHint)>;
101
102    /// Slide the window frame forward and collect the evict hint. Don't calculate the output if possible.
103    fn slide_no_output(&mut self) -> Result<StateEvictHint>;
104}
105
106pub type BoxedWindowState = Box<dyn WindowState + Send + Sync>;
107
108#[linkme::distributed_slice]
109pub static WINDOW_STATE_BUILDERS: [fn(&WindowFuncCall) -> Result<BoxedWindowState>];
110
111pub fn create_window_state(call: &WindowFuncCall) -> Result<BoxedWindowState> {
112    // we expect only one builder function in `expr_impl/window_function/mod.rs`
113    let builder = WINDOW_STATE_BUILDERS.iter().next();
114    builder.map_or_else(
115        || {
116            Err(ExprError::UnsupportedFunction(format!(
117                "{}({}) -> {}",
118                call.kind,
119                call.args.arg_types().iter().format(", "),
120                &call.return_type,
121            )))
122        },
123        |f| f(call),
124    )
125}