risingwave_frontend/optimizer/property/
monotonicity.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::ops::Index;
17
18use enum_as_inner::EnumAsInner;
19use risingwave_common::types::DataType;
20use risingwave_pb::expr::expr_node::Type as ExprType;
21
22use crate::expr::{Expr, ExprImpl, FunctionCall, TableFunction};
23
24/// Represents the derivation of the monotonicity of a column.
25/// This enum aims to unify the "non-decreasing analysis" and watermark derivation.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
27pub enum MonotonicityDerivation {
28    /// The monotonicity of the column is inherent, meaning that it is derived from the column itself.
29    Inherent(Monotonicity),
30    /// The monotonicity of the column follows the monotonicity of the specified column in the input.
31    FollowingInput(usize),
32    /// The monotonicity of the column INVERSELY follows the monotonicity of the specified column in the input.
33    /// This is not used currently.
34    _FollowingInputInversely(usize),
35}
36
37impl MonotonicityDerivation {
38    pub fn inverse(self) -> Self {
39        use MonotonicityDerivation::*;
40        match self {
41            Inherent(monotonicity) => Inherent(monotonicity.inverse()),
42            FollowingInput(idx) => _FollowingInputInversely(idx),
43            _FollowingInputInversely(idx) => FollowingInput(idx),
44        }
45    }
46}
47
48/// Represents the monotonicity of a column.
49///
50/// Monotonicity is a property of the output column of stream node that describes the the order
51/// of the values in the column. One [`Monotonicity`] value is associated with one column, so
52/// each stream node should have a [`MonotonicityMap`] to describe the monotonicity of all its
53/// output columns.
54///
55/// For operator that yields append-only stream, the monotonicity being `NonDecreasing` means
56/// that it will never yield a row smaller than any previously yielded row.
57///
58/// For operator that yields non-append-only stream, the monotonicity being `NonDecreasing` means
59/// that it will never yield a change that has smaller value than any previously yielded change,
60/// ignoring the `Op`. So if such operator yields a `NonDecreasing` column, `Delete` and `UpdateDelete`s
61/// can only happen on the last emitted row (or last rows with the same value on the column). This
62/// is especially useful for `StreamNow` operator with `UpdateCurrent` mode, in which case only
63/// one output row is actively maintained and the value is non-decreasing.
64///
65/// Monotonicity property is be considered in default order type, i.e., ASC NULLS LAST. This means
66/// that `NULL`s are considered largest when analyzing monotonicity.
67///
68/// For distributed operators, the monotonicity describes the property of the output column of
69/// each shard of the operator.
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
71pub enum Monotonicity {
72    Constant,
73    NonDecreasing,
74    NonIncreasing,
75    Unknown,
76}
77
78impl Monotonicity {
79    pub fn is_constant(self) -> bool {
80        matches!(self, Monotonicity::Constant)
81    }
82
83    pub fn is_non_decreasing(self) -> bool {
84        // we don't use `EnumAsInner` here because we need to include `Constant`
85        matches!(self, Monotonicity::NonDecreasing | Monotonicity::Constant)
86    }
87
88    pub fn is_non_increasing(self) -> bool {
89        // similar to `is_non_decreasing`
90        matches!(self, Monotonicity::NonIncreasing | Monotonicity::Constant)
91    }
92
93    pub fn is_unknown(self) -> bool {
94        matches!(self, Monotonicity::Unknown)
95    }
96
97    pub fn inverse(self) -> Self {
98        use Monotonicity::*;
99        match self {
100            Constant => Constant,
101            NonDecreasing => NonIncreasing,
102            NonIncreasing => NonDecreasing,
103            Unknown => Unknown,
104        }
105    }
106}
107
108pub mod monotonicity_variants {
109    pub use super::Monotonicity::*;
110    pub use super::MonotonicityDerivation::*;
111}
112
113/// Analyze the monotonicity of an expression.
114pub fn analyze_monotonicity(expr: &ExprImpl) -> MonotonicityDerivation {
115    let analyzer = MonotonicityAnalyzer {};
116    analyzer.visit_expr(expr)
117}
118
119struct MonotonicityAnalyzer {}
120
121impl MonotonicityAnalyzer {
122    fn visit_expr(&self, expr: &ExprImpl) -> MonotonicityDerivation {
123        use monotonicity_variants::*;
124        match expr {
125            // recursion base
126            ExprImpl::InputRef(inner) => FollowingInput(inner.index()),
127            ExprImpl::Literal(_) => Inherent(Constant),
128            ExprImpl::Now(_) => Inherent(NonDecreasing),
129            ExprImpl::UserDefinedFunction(_) => Inherent(Unknown),
130
131            // recursively visit children
132            ExprImpl::FunctionCall(inner) => self.visit_function_call(inner),
133            ExprImpl::FunctionCallWithLambda(inner) => self.visit_function_call(inner.base()),
134            ExprImpl::TableFunction(inner) => self.visit_table_function(inner),
135
136            // the analyzer is not expected to be used when the following expression types are present
137            ExprImpl::Subquery(_)
138            | ExprImpl::AggCall(_)
139            | ExprImpl::CorrelatedInputRef(_)
140            | ExprImpl::WindowFunction(_)
141            | ExprImpl::Parameter(_) => panic!(
142                "Expression `{}` is not expected in the monotonicity analyzer",
143                expr.variant_name()
144            ),
145        }
146    }
147
148    fn visit_unary_op(&self, inputs: &[ExprImpl]) -> MonotonicityDerivation {
149        assert_eq!(inputs.len(), 1);
150        self.visit_expr(&inputs[0])
151    }
152
153    fn visit_binary_op(
154        &self,
155        inputs: &[ExprImpl],
156    ) -> (MonotonicityDerivation, MonotonicityDerivation) {
157        assert_eq!(inputs.len(), 2);
158        (self.visit_expr(&inputs[0]), self.visit_expr(&inputs[1]))
159    }
160
161    fn visit_ternary_op(
162        &self,
163        inputs: &[ExprImpl],
164    ) -> (
165        MonotonicityDerivation,
166        MonotonicityDerivation,
167        MonotonicityDerivation,
168    ) {
169        assert_eq!(inputs.len(), 3);
170        (
171            self.visit_expr(&inputs[0]),
172            self.visit_expr(&inputs[1]),
173            self.visit_expr(&inputs[2]),
174        )
175    }
176
177    fn visit_function_call(&self, func_call: &FunctionCall) -> MonotonicityDerivation {
178        use monotonicity_variants::*;
179
180        fn time_zone_is_without_dst(time_zone: Option<&str>) -> bool {
181            #[allow(clippy::let_and_return)] // to make it more readable
182            let tz_is_utc =
183                time_zone.is_some_and(|time_zone| time_zone.eq_ignore_ascii_case("UTC"));
184            tz_is_utc // conservative
185        }
186
187        match func_call.func_type() {
188            ExprType::Unspecified => unreachable!(),
189            ExprType::Add => match self.visit_binary_op(func_call.inputs()) {
190                (Inherent(Constant), any) | (any, Inherent(Constant)) => any,
191                (Inherent(NonDecreasing), Inherent(NonDecreasing)) => Inherent(NonDecreasing),
192                (Inherent(NonIncreasing), Inherent(NonIncreasing)) => Inherent(NonIncreasing),
193                _ => Inherent(Unknown),
194            },
195            ExprType::Subtract => match self.visit_binary_op(func_call.inputs()) {
196                (any, Inherent(Constant)) => any,
197                (Inherent(Constant), any) => any.inverse(),
198                _ => Inherent(Unknown),
199            },
200            ExprType::Multiply | ExprType::Divide | ExprType::Modulus => {
201                match self.visit_binary_op(func_call.inputs()) {
202                    (Inherent(Constant), Inherent(Constant)) => Inherent(Constant),
203                    _ => Inherent(Unknown), // let's be lazy here
204                }
205            }
206            ExprType::TumbleStart => {
207                if func_call.inputs().len() == 2 {
208                    // without `offset`, args: `(start, interval)`
209                    match self.visit_binary_op(func_call.inputs()) {
210                        (any, Inherent(Constant)) => any,
211                        _ => Inherent(Unknown),
212                    }
213                } else {
214                    // with `offset`, args: `(start, interval, offset)`
215                    assert_eq!(ExprType::TumbleStart, func_call.func_type());
216                    match self.visit_ternary_op(func_call.inputs()) {
217                        (any, Inherent(Constant), Inherent(Constant)) => any,
218                        _ => Inherent(Unknown),
219                    }
220                }
221            }
222            ExprType::AtTimeZone => match self.visit_binary_op(func_call.inputs()) {
223                (Inherent(Constant), Inherent(Constant)) => Inherent(Constant),
224                (any, Inherent(Constant)) => {
225                    let time_zone = func_call.inputs()[1]
226                        .as_literal()
227                        .and_then(|literal| literal.get_data().as_ref())
228                        .map(|tz| tz.as_utf8().as_ref());
229                    // 1. For at_time_zone(timestamp, const timezone) -> timestamptz, when timestamp has some monotonicity,
230                    // the result should have the same monotonicity.
231                    // 2. For at_time_zone(timestamptz, const timezone) -> timestamp, when timestamptz has some monotonicity,
232                    // the result only have the same monotonicity when the timezone is without DST (Daylight Saving Time).
233                    if (func_call.inputs()[0].return_type() == DataType::Timestamp
234                        && func_call.return_type() == DataType::Timestamptz)
235                        || time_zone_is_without_dst(time_zone)
236                    {
237                        any
238                    } else {
239                        Inherent(Unknown)
240                    }
241                }
242                _ => Inherent(Unknown),
243            },
244            ExprType::DateTrunc => match func_call.inputs().len() {
245                2 => match self.visit_binary_op(func_call.inputs()) {
246                    (Inherent(Constant), any) => any,
247                    _ => Inherent(Unknown),
248                },
249                3 => match self.visit_ternary_op(func_call.inputs()) {
250                    (Inherent(Constant), Inherent(Constant), Inherent(Constant)) => {
251                        Inherent(Constant)
252                    }
253                    (Inherent(Constant), any, Inherent(Constant)) => {
254                        let time_zone = func_call.inputs()[2]
255                            .as_literal()
256                            .and_then(|literal| literal.get_data().as_ref())
257                            .map(|tz| tz.as_utf8().as_ref());
258                        if time_zone_is_without_dst(time_zone) {
259                            any
260                        } else {
261                            Inherent(Unknown)
262                        }
263                    }
264                    _ => Inherent(Unknown),
265                },
266                _ => unreachable!(),
267            },
268            ExprType::AddWithTimeZone | ExprType::SubtractWithTimeZone => {
269                // Requires time zone and interval to be literal, at least for now.
270                let time_zone = match &func_call.inputs()[2] {
271                    ExprImpl::Literal(lit) => {
272                        lit.get_data().as_ref().map(|tz| tz.as_utf8().as_ref())
273                    }
274                    _ => return Inherent(Unknown),
275                };
276                let interval = match &func_call.inputs()[1] {
277                    ExprImpl::Literal(lit) => lit
278                        .get_data()
279                        .as_ref()
280                        .map(|interval| interval.as_interval()),
281                    _ => return Inherent(Unknown),
282                };
283                let quantitative_only = interval.is_none_or(|v| {
284                    v.months() == 0 && (v.days() == 0 || time_zone_is_without_dst(time_zone))
285                });
286                match (self.visit_expr(&func_call.inputs()[0]), quantitative_only) {
287                    (Inherent(Constant), _) => Inherent(Constant),
288                    (any, true) => any,
289                    _ => Inherent(Unknown),
290                }
291            }
292            ExprType::SecToTimestamptz => self.visit_unary_op(func_call.inputs()),
293            ExprType::CharToTimestamptz => Inherent(Unknown),
294            ExprType::Cast => {
295                // TODO: need more derivation
296                Inherent(Unknown)
297            }
298            ExprType::Case => {
299                // TODO: do we need derive watermark when every case can derive a common watermark?
300                Inherent(Unknown)
301            }
302            ExprType::Proctime => Inherent(NonDecreasing),
303            _ => Inherent(Unknown),
304        }
305    }
306
307    fn visit_table_function(&self, _table_func: &TableFunction) -> MonotonicityDerivation {
308        // TODO: derive monotonicity for table funcs like `generate_series`
309        use monotonicity_variants::*;
310        Inherent(Unknown)
311    }
312}
313
314/// A map from column index to its monotonicity.
315#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
316pub struct MonotonicityMap(BTreeMap<usize, Monotonicity>);
317
318impl MonotonicityMap {
319    pub fn new() -> Self {
320        MonotonicityMap(BTreeMap::new())
321    }
322
323    pub fn insert(&mut self, idx: usize, monotonicity: Monotonicity) {
324        if monotonicity != Monotonicity::Unknown {
325            self.0.insert(idx, monotonicity);
326        }
327    }
328
329    pub fn iter(&self) -> impl Iterator<Item = (usize, Monotonicity)> + '_ {
330        self.0
331            .iter()
332            .map(|(idx, monotonicity)| (*idx, *monotonicity))
333    }
334}
335
336impl Index<usize> for MonotonicityMap {
337    type Output = Monotonicity;
338
339    fn index(&self, idx: usize) -> &Self::Output {
340        self.0.get(&idx).unwrap_or(&Monotonicity::Unknown)
341    }
342}
343
344impl IntoIterator for MonotonicityMap {
345    type IntoIter = std::collections::btree_map::IntoIter<usize, Monotonicity>;
346    type Item = (usize, Monotonicity);
347
348    fn into_iter(self) -> Self::IntoIter {
349        self.0.into_iter()
350    }
351}
352
353impl FromIterator<(usize, Monotonicity)> for MonotonicityMap {
354    fn from_iter<T: IntoIterator<Item = (usize, Monotonicity)>>(iter: T) -> Self {
355        MonotonicityMap(iter.into_iter().collect())
356    }
357}