risingwave_frontend/optimizer/property/
monotonicity.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::ops::Index;
17
18use enum_as_inner::EnumAsInner;
19use risingwave_common::types::DataType;
20use risingwave_pb::expr::expr_node::Type as ExprType;
21
22use crate::expr::{Expr, ExprImpl, FunctionCall, TableFunction};
23
24/// Represents the derivation of the monotonicity of a column.
25/// This enum aims to unify the "non-decreasing analysis" and watermark derivation.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
27pub enum MonotonicityDerivation {
28    /// The monotonicity of the column is inherent, meaning that it is derived from the column itself.
29    Inherent(Monotonicity),
30    /// The monotonicity of the column follows the monotonicity of the specified column in the input.
31    FollowingInput(usize),
32    /// The monotonicity of the column INVERSELY follows the monotonicity of the specified column in the input.
33    /// This is not used currently.
34    _FollowingInputInversely(usize),
35}
36
37impl MonotonicityDerivation {
38    pub fn inverse(self) -> Self {
39        use MonotonicityDerivation::*;
40        match self {
41            Inherent(monotonicity) => Inherent(monotonicity.inverse()),
42            FollowingInput(idx) => _FollowingInputInversely(idx),
43            _FollowingInputInversely(idx) => FollowingInput(idx),
44        }
45    }
46}
47
48/// Represents the monotonicity of a column.
49///
50/// Monotonicity is a property of the output column of stream node that describes the the order
51/// of the values in the column. One [`Monotonicity`] value is associated with one column, so
52/// each stream node should have a [`MonotonicityMap`] to describe the monotonicity of all its
53/// output columns.
54///
55/// For operator that yields append-only stream, the monotonicity being `NonDecreasing` means
56/// that it will never yield a row smaller than any previously yielded row.
57///
58/// For operator that yields non-append-only stream, the monotonicity being `NonDecreasing` means
59/// that it will never yield a change that has smaller value than any previously yielded change,
60/// ignoring the `Op`. So if such operator yields a `NonDecreasing` column, `Delete` and `UpdateDelete`s
61/// can only happen on the last emitted row (or last rows with the same value on the column). This
62/// is especially useful for `StreamNow` operator with `UpdateCurrent` mode, in which case only
63/// one output row is actively maintained and the value is non-decreasing.
64///
65/// Monotonicity property is be considered in default order type, i.e., ASC NULLS LAST. This means
66/// that `NULL`s are considered largest when analyzing monotonicity.
67///
68/// For distributed operators, the monotonicity describes the property of the output column of
69/// each shard of the operator.
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
71pub enum Monotonicity {
72    Constant,
73    NonDecreasing,
74    NonIncreasing,
75    Unknown,
76}
77
78impl Monotonicity {
79    pub fn is_constant(self) -> bool {
80        matches!(self, Monotonicity::Constant)
81    }
82
83    pub fn is_non_decreasing(self) -> bool {
84        // we don't use `EnumAsInner` here because we need to include `Constant`
85        matches!(self, Monotonicity::NonDecreasing | Monotonicity::Constant)
86    }
87
88    pub fn is_non_increasing(self) -> bool {
89        // similar to `is_non_decreasing`
90        matches!(self, Monotonicity::NonIncreasing | Monotonicity::Constant)
91    }
92
93    pub fn is_unknown(self) -> bool {
94        matches!(self, Monotonicity::Unknown)
95    }
96
97    pub fn inverse(self) -> Self {
98        use Monotonicity::*;
99        match self {
100            Constant => Constant,
101            NonDecreasing => NonIncreasing,
102            NonIncreasing => NonDecreasing,
103            Unknown => Unknown,
104        }
105    }
106}
107
108pub mod monotonicity_variants {
109    pub use super::Monotonicity::*;
110    pub use super::MonotonicityDerivation::*;
111}
112
113/// Analyze the monotonicity of an expression.
114pub fn analyze_monotonicity(expr: &ExprImpl) -> MonotonicityDerivation {
115    let analyzer = MonotonicityAnalyzer {};
116    analyzer.visit_expr(expr)
117}
118
119struct MonotonicityAnalyzer {}
120
121impl MonotonicityAnalyzer {
122    fn visit_expr(&self, expr: &ExprImpl) -> MonotonicityDerivation {
123        use monotonicity_variants::*;
124        match expr {
125            // recursion base
126            ExprImpl::InputRef(inner) => FollowingInput(inner.index()),
127            ExprImpl::Literal(_) => Inherent(Constant),
128            ExprImpl::Now(_) => Inherent(NonDecreasing),
129            ExprImpl::UserDefinedFunction(_) | ExprImpl::SecretRef(_) => Inherent(Unknown),
130
131            // recursively visit children
132            ExprImpl::FunctionCall(inner) => self.visit_function_call(inner),
133            ExprImpl::FunctionCallWithLambda(inner) => self.visit_function_call(inner.base()),
134            ExprImpl::TableFunction(inner) => self.visit_table_function(inner),
135
136            // the analyzer is not expected to be used when the following expression types are present
137            ExprImpl::Subquery(_)
138            | ExprImpl::AggCall(_)
139            | ExprImpl::CorrelatedInputRef(_)
140            | ExprImpl::WindowFunction(_)
141            | ExprImpl::Parameter(_) => panic!(
142                "Expression `{}` is not expected in the monotonicity analyzer",
143                expr.variant_name()
144            ),
145        }
146    }
147
148    fn visit_unary_op(&self, inputs: &[ExprImpl]) -> MonotonicityDerivation {
149        assert_eq!(inputs.len(), 1);
150        self.visit_expr(&inputs[0])
151    }
152
153    fn visit_binary_op(
154        &self,
155        inputs: &[ExprImpl],
156    ) -> (MonotonicityDerivation, MonotonicityDerivation) {
157        assert_eq!(inputs.len(), 2);
158        (self.visit_expr(&inputs[0]), self.visit_expr(&inputs[1]))
159    }
160
161    fn visit_ternary_op(
162        &self,
163        inputs: &[ExprImpl],
164    ) -> (
165        MonotonicityDerivation,
166        MonotonicityDerivation,
167        MonotonicityDerivation,
168    ) {
169        assert_eq!(inputs.len(), 3);
170        (
171            self.visit_expr(&inputs[0]),
172            self.visit_expr(&inputs[1]),
173            self.visit_expr(&inputs[2]),
174        )
175    }
176
177    fn visit_function_call(&self, func_call: &FunctionCall) -> MonotonicityDerivation {
178        use monotonicity_variants::*;
179
180        fn time_zone_is_without_dst(time_zone: Option<&str>) -> bool {
181            time_zone.is_some_and(|time_zone| time_zone.eq_ignore_ascii_case("UTC")) // conservative
182        }
183
184        match func_call.func_type() {
185            ExprType::Unspecified => unreachable!(),
186            ExprType::Add => match self.visit_binary_op(func_call.inputs()) {
187                (Inherent(Constant), any) | (any, Inherent(Constant)) => any,
188                (Inherent(NonDecreasing), Inherent(NonDecreasing)) => Inherent(NonDecreasing),
189                (Inherent(NonIncreasing), Inherent(NonIncreasing)) => Inherent(NonIncreasing),
190                _ => Inherent(Unknown),
191            },
192            ExprType::Subtract => match self.visit_binary_op(func_call.inputs()) {
193                (any, Inherent(Constant)) => any,
194                (Inherent(Constant), any) => any.inverse(),
195                _ => Inherent(Unknown),
196            },
197            ExprType::Multiply | ExprType::Divide | ExprType::Modulus => {
198                match self.visit_binary_op(func_call.inputs()) {
199                    (Inherent(Constant), Inherent(Constant)) => Inherent(Constant),
200                    _ => Inherent(Unknown), // let's be lazy here
201                }
202            }
203            ExprType::TumbleStart => {
204                if func_call.inputs().len() == 2 {
205                    // without `offset`, args: `(start, interval)`
206                    match self.visit_binary_op(func_call.inputs()) {
207                        (any, Inherent(Constant)) => any,
208                        _ => Inherent(Unknown),
209                    }
210                } else {
211                    // with `offset`, args: `(start, interval, offset)`
212                    assert_eq!(ExprType::TumbleStart, func_call.func_type());
213                    match self.visit_ternary_op(func_call.inputs()) {
214                        (any, Inherent(Constant), Inherent(Constant)) => any,
215                        _ => Inherent(Unknown),
216                    }
217                }
218            }
219            ExprType::AtTimeZone => match self.visit_binary_op(func_call.inputs()) {
220                (Inherent(Constant), Inherent(Constant)) => Inherent(Constant),
221                (any, Inherent(Constant)) => {
222                    let time_zone = func_call.inputs()[1]
223                        .as_literal()
224                        .and_then(|literal| literal.get_data().as_ref())
225                        .map(|tz| tz.as_utf8().as_ref());
226                    // 1. For at_time_zone(timestamp, const timezone) -> timestamptz, when timestamp has some monotonicity,
227                    // the result should have the same monotonicity.
228                    // 2. For at_time_zone(timestamptz, const timezone) -> timestamp, when timestamptz has some monotonicity,
229                    // the result only have the same monotonicity when the timezone is without DST (Daylight Saving Time).
230                    if (func_call.inputs()[0].return_type() == DataType::Timestamp
231                        && func_call.return_type() == DataType::Timestamptz)
232                        || time_zone_is_without_dst(time_zone)
233                    {
234                        any
235                    } else {
236                        Inherent(Unknown)
237                    }
238                }
239                _ => Inherent(Unknown),
240            },
241            ExprType::DateTrunc => match func_call.inputs().len() {
242                2 => match self.visit_binary_op(func_call.inputs()) {
243                    (Inherent(Constant), any) => any,
244                    _ => Inherent(Unknown),
245                },
246                3 => match self.visit_ternary_op(func_call.inputs()) {
247                    (Inherent(Constant), Inherent(Constant), Inherent(Constant)) => {
248                        Inherent(Constant)
249                    }
250                    (Inherent(Constant), any, Inherent(Constant)) => {
251                        let time_zone = func_call.inputs()[2]
252                            .as_literal()
253                            .and_then(|literal| literal.get_data().as_ref())
254                            .map(|tz| tz.as_utf8().as_ref());
255                        if time_zone_is_without_dst(time_zone) {
256                            any
257                        } else {
258                            Inherent(Unknown)
259                        }
260                    }
261                    _ => Inherent(Unknown),
262                },
263                _ => unreachable!(),
264            },
265            ExprType::AddWithTimeZone | ExprType::SubtractWithTimeZone => {
266                // Requires time zone and interval to be literal, at least for now.
267                let time_zone = match &func_call.inputs()[2] {
268                    ExprImpl::Literal(lit) => {
269                        lit.get_data().as_ref().map(|tz| tz.as_utf8().as_ref())
270                    }
271                    _ => return Inherent(Unknown),
272                };
273                let interval = match &func_call.inputs()[1] {
274                    ExprImpl::Literal(lit) => lit
275                        .get_data()
276                        .as_ref()
277                        .map(|interval| interval.as_interval()),
278                    _ => return Inherent(Unknown),
279                };
280                let quantitative_only = interval.is_none_or(|v| {
281                    v.months() == 0 && (v.days() == 0 || time_zone_is_without_dst(time_zone))
282                });
283                match (self.visit_expr(&func_call.inputs()[0]), quantitative_only) {
284                    (Inherent(Constant), _) => Inherent(Constant),
285                    (any, true) => any,
286                    _ => Inherent(Unknown),
287                }
288            }
289            ExprType::SecToTimestamptz => self.visit_unary_op(func_call.inputs()),
290            ExprType::CharToTimestamptz => Inherent(Unknown),
291            ExprType::Cast => {
292                // TODO: need more derivation
293                Inherent(Unknown)
294            }
295            ExprType::Case => {
296                // TODO: do we need derive watermark when every case can derive a common watermark?
297                Inherent(Unknown)
298            }
299            ExprType::Proctime => Inherent(NonDecreasing),
300            _ => Inherent(Unknown),
301        }
302    }
303
304    fn visit_table_function(&self, _table_func: &TableFunction) -> MonotonicityDerivation {
305        // TODO: derive monotonicity for table funcs like `generate_series`
306        use monotonicity_variants::*;
307        Inherent(Unknown)
308    }
309}
310
311/// A map from column index to its monotonicity.
312#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
313pub struct MonotonicityMap(BTreeMap<usize, Monotonicity>);
314
315impl MonotonicityMap {
316    pub fn new() -> Self {
317        MonotonicityMap(BTreeMap::new())
318    }
319
320    pub fn insert(&mut self, idx: usize, monotonicity: Monotonicity) {
321        if monotonicity != Monotonicity::Unknown {
322            self.0.insert(idx, monotonicity);
323        }
324    }
325
326    pub fn iter(&self) -> impl Iterator<Item = (usize, Monotonicity)> + '_ {
327        self.0
328            .iter()
329            .map(|(idx, monotonicity)| (*idx, *monotonicity))
330    }
331}
332
333impl Index<usize> for MonotonicityMap {
334    type Output = Monotonicity;
335
336    fn index(&self, idx: usize) -> &Self::Output {
337        self.0.get(&idx).unwrap_or(&Monotonicity::Unknown)
338    }
339}
340
341impl IntoIterator for MonotonicityMap {
342    type IntoIter = std::collections::btree_map::IntoIter<usize, Monotonicity>;
343    type Item = (usize, Monotonicity);
344
345    fn into_iter(self) -> Self::IntoIter {
346        self.0.into_iter()
347    }
348}
349
350impl FromIterator<(usize, Monotonicity)> for MonotonicityMap {
351    fn from_iter<T: IntoIterator<Item = (usize, Monotonicity)>>(iter: T) -> Self {
352        MonotonicityMap(iter.into_iter().collect())
353    }
354}