risingwave_frontend/expr/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use enum_as_inner::EnumAsInner;
16use fixedbitset::FixedBitSet;
17use futures::FutureExt;
18use paste::paste;
19use risingwave_common::array::ListValue;
20use risingwave_common::types::{DataType, Datum, JsonbVal, MapType, Scalar, ScalarImpl};
21use risingwave_expr::aggregate::PbAggKind;
22use risingwave_expr::expr::build_from_prost;
23use risingwave_pb::expr::expr_node::RexNode;
24use risingwave_pb::expr::{ExprNode, ProjectSetSelectItem};
25use user_defined_function::UserDefinedFunctionDisplay;
26
27use crate::error::{ErrorCode, Result as RwResult};
28use crate::session::current;
29
30mod agg_call;
31mod correlated_input_ref;
32mod function_call;
33mod function_call_with_lambda;
34mod input_ref;
35mod literal;
36mod now;
37mod parameter;
38mod pure;
39mod subquery;
40mod table_function;
41mod user_defined_function;
42mod window_function;
43
44mod order_by_expr;
45pub use order_by_expr::{OrderBy, OrderByExpr};
46
47mod expr_mutator;
48mod expr_rewriter;
49mod expr_visitor;
50pub mod function_impl;
51mod session_timezone;
52mod type_inference;
53mod utils;
54
55pub use agg_call::AggCall;
56pub use correlated_input_ref::{CorrelatedId, CorrelatedInputRef, Depth, InputRefDepthRewriter};
57pub use expr_mutator::ExprMutator;
58pub use expr_rewriter::{ExprRewriter, default_rewrite_expr};
59pub use expr_visitor::{ExprVisitor, default_visit_expr};
60pub use function_call::{FunctionCall, FunctionCallDisplay, is_row_function};
61pub use function_call_with_lambda::FunctionCallWithLambda;
62pub use input_ref::{InputRef, InputRefDisplay, input_ref_to_column_indices};
63pub use literal::Literal;
64pub use now::{InlineNowProcTime, Now, NowProcTimeFinder};
65pub use parameter::Parameter;
66pub use pure::*;
67pub use risingwave_pb::expr::expr_node::Type as ExprType;
68pub use session_timezone::{SessionTimezone, TimestamptzExprFinder};
69pub use subquery::{Subquery, SubqueryKind};
70pub use table_function::{TableFunction, TableFunctionType};
71pub use type_inference::*;
72pub use user_defined_function::UserDefinedFunction;
73pub use utils::*;
74pub use window_function::WindowFunction;
75
76pub(crate) const EXPR_DEPTH_THRESHOLD: usize = 30;
77pub(crate) const EXPR_TOO_DEEP_NOTICE: &str = "Some expression is too complicated. \
78Consider simplifying or splitting the query if you encounter any issues.";
79
80pub(crate) fn reject_impure(expr: impl Into<ExprImpl>, context: &str) -> RwResult<()> {
81    if let Some(impure_expr_desc) = impure_expr_desc(&expr.into()) {
82        let msg = format!(
83            "using an impure expression ({impure_expr_desc}) in {context} \
84             on a retract stream may lead to inconsistent results"
85        );
86        if current::config()
87            .is_some_and(|c| c.read().streaming_unsafe_allow_unmaterialized_impure_expr())
88        {
89            current::notice_to_user(msg);
90        } else {
91            return Err(ErrorCode::NotSupported(
92                msg,
93                "rewrite the query to extract the impure expression into the select list, \
94                 or set `streaming_unsafe_allow_unmaterialized_impure_expr` to allow \
95                 the behavior at the risk of inconsistent results or panics during execution"
96                    .into(),
97            )
98            .into());
99        }
100    }
101    Ok(())
102}
103
104/// the trait of bound expressions
105pub trait Expr: Into<ExprImpl> {
106    /// Get the return type of the expr
107    fn return_type(&self) -> DataType;
108
109    /// Try to serialize the expression, returning an error if it's impossible.
110    fn try_to_expr_proto(&self) -> Result<ExprNode, String>;
111
112    /// Serialize the expression. Panic if it's impossible.
113    fn to_expr_proto(&self) -> ExprNode {
114        self.try_to_expr_proto()
115            .expect("failed to serialize expression to protobuf")
116    }
117
118    /// Serialize the expression. Returns an error if this will result in an impure expression on a
119    /// retract stream, which may lead to inconsistent results.
120    fn to_expr_proto_checked_pure(
121        &self,
122        retract: bool,
123        context: &str,
124    ) -> crate::error::Result<ExprNode>
125    where
126        Self: Clone,
127    {
128        if retract {
129            reject_impure(self.clone(), context)?;
130        }
131        self.try_to_expr_proto()
132            .map_err(|e| ErrorCode::InternalError(e).into())
133    }
134}
135
136macro_rules! impl_expr_impl {
137    ($($t:ident,)*) => {
138        #[derive(Clone, Eq, PartialEq, Hash, EnumAsInner)]
139        pub enum ExprImpl {
140            $($t(Box<$t>),)*
141        }
142
143        impl ExprImpl {
144            pub fn variant_name(&self) -> &'static str {
145                match self {
146                    $(ExprImpl::$t(_) => stringify!($t),)*
147                }
148            }
149        }
150
151        $(
152        impl From<$t> for ExprImpl {
153            fn from(o: $t) -> ExprImpl {
154                ExprImpl::$t(Box::new(o))
155            }
156        })*
157
158        impl Expr for ExprImpl {
159            fn return_type(&self) -> DataType {
160                match self {
161                    $(ExprImpl::$t(expr) => expr.return_type(),)*
162                }
163            }
164
165            fn try_to_expr_proto(&self) -> Result<ExprNode, String> {
166                match self {
167                    $(ExprImpl::$t(expr) => expr.try_to_expr_proto(),)*
168                }
169            }
170        }
171    };
172}
173
174impl_expr_impl!(
175    // BoundColumnRef, might be used in binder.
176    CorrelatedInputRef,
177    InputRef,
178    Literal,
179    FunctionCall,
180    FunctionCallWithLambda,
181    AggCall,
182    Subquery,
183    TableFunction,
184    WindowFunction,
185    UserDefinedFunction,
186    Parameter,
187    Now,
188);
189
190impl ExprImpl {
191    /// A literal int value.
192    #[inline(always)]
193    pub fn literal_int(v: i32) -> Self {
194        Literal::new(Some(v.to_scalar_value()), DataType::Int32).into()
195    }
196
197    /// A literal bigint value
198    #[inline(always)]
199    pub fn literal_bigint(v: i64) -> Self {
200        Literal::new(Some(v.to_scalar_value()), DataType::Int64).into()
201    }
202
203    /// A literal float64 value.
204    #[inline(always)]
205    pub fn literal_f64(v: f64) -> Self {
206        Literal::new(Some(v.into()), DataType::Float64).into()
207    }
208
209    /// A literal boolean value.
210    #[inline(always)]
211    pub fn literal_bool(v: bool) -> Self {
212        Literal::new(Some(v.to_scalar_value()), DataType::Boolean).into()
213    }
214
215    /// A literal varchar value.
216    #[inline(always)]
217    pub fn literal_varchar(v: String) -> Self {
218        Literal::new(Some(v.into()), DataType::Varchar).into()
219    }
220
221    /// A literal null value.
222    #[inline(always)]
223    pub fn literal_null(element_type: DataType) -> Self {
224        Literal::new(None, element_type).into()
225    }
226
227    /// A literal jsonb value.
228    #[inline(always)]
229    pub fn literal_jsonb(v: JsonbVal) -> Self {
230        Literal::new(Some(v.into()), DataType::Jsonb).into()
231    }
232
233    /// A literal list value.
234    #[inline(always)]
235    pub fn literal_list(v: ListValue, element_type: DataType) -> Self {
236        Literal::new(Some(v.to_scalar_value()), DataType::list(element_type)).into()
237    }
238
239    /// Takes the expression, leaving a literal null of the same type in its place.
240    pub fn take(&mut self) -> Self {
241        std::mem::replace(self, Self::literal_null(self.return_type()))
242    }
243
244    /// A `count(*)` aggregate function.
245    #[inline(always)]
246    pub fn count_star() -> Self {
247        AggCall::new(
248            PbAggKind::Count.into(),
249            vec![],
250            false,
251            OrderBy::any(),
252            Condition::true_cond(),
253            vec![],
254        )
255        .unwrap()
256        .into()
257    }
258
259    /// Create a new expression by merging the given expressions by `And`.
260    ///
261    /// If `exprs` is empty, return a literal `true`.
262    pub fn and(exprs: impl IntoIterator<Item = ExprImpl>) -> Self {
263        merge_expr_by_logical(exprs, ExprType::And, ExprImpl::literal_bool(true))
264    }
265
266    /// Create a new expression by merging the given expressions by `Or`.
267    ///
268    /// If `exprs` is empty, return a literal `false`.
269    pub fn or(exprs: impl IntoIterator<Item = ExprImpl>) -> Self {
270        merge_expr_by_logical(exprs, ExprType::Or, ExprImpl::literal_bool(false))
271    }
272
273    /// Collect all `InputRef`s' indexes in the expression.
274    ///
275    /// # Panics
276    /// Panics if `input_ref >= input_col_num`.
277    pub fn collect_input_refs(&self, input_col_num: usize) -> FixedBitSet {
278        collect_input_refs(input_col_num, [self])
279    }
280
281    /// Check if the expression has no side effects and output is deterministic
282    pub fn is_pure(&self) -> bool {
283        is_pure(self)
284    }
285
286    pub fn is_impure(&self) -> bool {
287        is_impure(self)
288    }
289
290    /// Count `Now`s in the expression.
291    pub fn count_nows(&self) -> usize {
292        let mut visitor = CountNow::default();
293        visitor.visit_expr(self);
294        visitor.count()
295    }
296
297    /// Check whether self is literal NULL.
298    pub fn is_null(&self) -> bool {
299        matches!(self, ExprImpl::Literal(literal) if literal.get_data().is_none())
300    }
301
302    /// Check whether self is a literal NULL or literal string.
303    pub fn is_untyped(&self) -> bool {
304        matches!(self, ExprImpl::Literal(literal) if literal.is_untyped())
305            || matches!(self, ExprImpl::Parameter(parameter) if !parameter.has_infer())
306    }
307
308    /// Shorthand to create cast expr to `target` type in implicit context.
309    pub fn cast_implicit(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
310        FunctionCall::cast_mut(&mut self, target, CastContext::Implicit)?;
311        Ok(self)
312    }
313
314    /// Shorthand to create cast expr to `target` type in assign context.
315    pub fn cast_assign(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
316        FunctionCall::cast_mut(&mut self, target, CastContext::Assign)?;
317        Ok(self)
318    }
319
320    /// Shorthand to create cast expr to `target` type in explicit context.
321    pub fn cast_explicit(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
322        FunctionCall::cast_mut(&mut self, target, CastContext::Explicit)?;
323        Ok(self)
324    }
325
326    /// Shorthand to inplace cast expr to `target` type in implicit context.
327    pub fn cast_implicit_mut(&mut self, target: &DataType) -> Result<(), CastError> {
328        FunctionCall::cast_mut(self, target, CastContext::Implicit)
329    }
330
331    /// Shorthand to inplace cast expr to `target` type in explicit context.
332    pub fn cast_explicit_mut(&mut self, target: &DataType) -> Result<(), CastError> {
333        FunctionCall::cast_mut(self, target, CastContext::Explicit)
334    }
335
336    /// Casting to Regclass type means getting the oid of expr.
337    /// See <https://www.postgresql.org/docs/current/datatype-oid.html>
338    pub fn cast_to_regclass(self) -> Result<ExprImpl, CastError> {
339        match self.return_type() {
340            DataType::Varchar => Ok(ExprImpl::FunctionCall(Box::new(
341                FunctionCall::new_unchecked(ExprType::CastRegclass, vec![self], DataType::Int32),
342            ))),
343            DataType::Int32 => Ok(self),
344            dt if dt.is_int() => Ok(self.cast_explicit(&DataType::Int32)?),
345            _ => bail_cast_error!("unsupported input type"),
346        }
347    }
348
349    /// Shorthand to inplace cast expr to `regclass` type.
350    pub fn cast_to_regclass_mut(&mut self) -> Result<(), CastError> {
351        let owned = std::mem::replace(self, ExprImpl::literal_bool(false));
352        *self = owned.cast_to_regclass()?;
353        Ok(())
354    }
355
356    /// Ensure the return type of this expression is an array of some type.
357    pub fn ensure_array_type(&self) -> Result<(), ErrorCode> {
358        if self.is_untyped() {
359            return Err(ErrorCode::BindError(
360                "could not determine polymorphic type because input has type unknown".into(),
361            ));
362        }
363        match self.return_type() {
364            DataType::List(_) => Ok(()),
365            t => Err(ErrorCode::BindError(format!("expects array but got {t}"))),
366        }
367    }
368
369    /// Ensure the return type of this expression is a map of some type.
370    pub fn try_into_map_type(&self) -> Result<MapType, ErrorCode> {
371        if self.is_untyped() {
372            return Err(ErrorCode::BindError(
373                "could not determine polymorphic type because input has type unknown".into(),
374            ));
375        }
376        match self.return_type() {
377            DataType::Map(m) => Ok(m),
378            t => Err(ErrorCode::BindError(format!("expects map but got {t}"))),
379        }
380    }
381
382    /// Shorthand to enforce implicit cast to boolean
383    pub fn enforce_bool_clause(self, clause: &str) -> RwResult<ExprImpl> {
384        if self.is_untyped() {
385            let inner = self.cast_implicit(&DataType::Boolean)?;
386            return Ok(inner);
387        }
388        let return_type = self.return_type();
389        if return_type != DataType::Boolean {
390            bail!(
391                "argument of {} must be boolean, not type {:?}",
392                clause,
393                return_type
394            )
395        }
396        Ok(self)
397    }
398
399    /// Create "cast" expr to string (`varchar`) type. This is different from a real cast, as
400    /// boolean is converted to a single char rather than full word.
401    ///
402    /// Choose between `cast_output` and `cast_{assign,explicit}(Varchar)` based on `PostgreSQL`'s
403    /// behavior on bools. For example, `concat(':', true)` is `:t` but `':' || true` is `:true`.
404    /// All other types have the same behavior when formatting to output and casting to string.
405    ///
406    /// References in `PostgreSQL`:
407    /// * [cast](https://github.com/postgres/postgres/blob/a3ff08e0b08dbfeb777ccfa8f13ebaa95d064c04/src/include/catalog/pg_cast.dat#L437-L444)
408    /// * [impl](https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/backend/utils/adt/bool.c#L204-L209)
409    pub fn cast_output(self) -> RwResult<ExprImpl> {
410        if self.return_type() == DataType::Boolean {
411            return Ok(FunctionCall::new(ExprType::BoolOut, vec![self])?.into());
412        }
413        // Use normal cast for other types. Both `assign` and `explicit` can pass the castability
414        // check and there is no difference.
415        self.cast_assign(&DataType::Varchar)
416            .map_err(|err| err.into())
417    }
418
419    /// Evaluate the expression on the given input.
420    ///
421    /// TODO: This is a naive implementation. We should avoid proto ser/de.
422    /// Tracking issue: <https://github.com/risingwavelabs/risingwave/issues/3479>
423    pub async fn eval_row(&self, input: &OwnedRow) -> RwResult<Datum> {
424        let backend_expr = build_from_prost(&self.to_expr_proto())?;
425        Ok(backend_expr.eval_row(input).await?)
426    }
427
428    /// Try to evaluate an expression if it's a constant expression by `ExprImpl::is_const`.
429    ///
430    /// Returns...
431    /// - `None` if it's not a constant expression,
432    /// - `Some(Ok(_))` if constant evaluation succeeds,
433    /// - `Some(Err(_))` if there's an error while evaluating a constant expression.
434    pub fn try_fold_const(&self) -> Option<RwResult<Datum>> {
435        if self.is_const() {
436            self.eval_row(&OwnedRow::empty())
437                .now_or_never()
438                .expect("constant expression should not be async")
439                .into()
440        } else {
441            None
442        }
443    }
444
445    /// Similar to `ExprImpl::try_fold_const`, but panics if the expression is not constant.
446    pub fn fold_const(&self) -> RwResult<Datum> {
447        self.try_fold_const().expect("expression is not constant")
448    }
449}
450
451/// Implement helper functions which recursively checks whether an variant is included in the
452/// expression. e.g., `has_subquery(&self) -> bool`
453///
454/// It will not traverse inside subqueries.
455macro_rules! impl_has_variant {
456    ( $($variant:ty),* ) => {
457        paste! {
458            impl ExprImpl {
459                $(
460                    pub fn [<has_ $variant:snake>](&self) -> bool {
461                        struct Has { has: bool }
462
463                        impl ExprVisitor for Has {
464                            fn [<visit_ $variant:snake>](&mut self, _: &$variant) {
465                                self.has = true;
466                            }
467                        }
468
469                        let mut visitor = Has { has: false };
470                        visitor.visit_expr(self);
471                        visitor.has
472                    }
473                )*
474            }
475        }
476    };
477}
478
479impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction, UserDefinedFunction, Now}
480
481/// Inequality condition between two input columns with clearer semantics.
482/// Represents: `left_col <op> right_col` where op is one of `<`, `<=`, `>`, `>=`.
483#[derive(Debug, Clone, PartialEq, Eq, Hash)]
484pub struct InequalityInputPair {
485    /// Index of the left side column (from left input).
486    pub left_idx: usize,
487    /// Index of the right side column (from right input, NOT offset by `left_cols_num`).
488    pub right_idx: usize,
489    /// Comparison operator: `left_col <op> right_col`.
490    pub op: ExprType,
491}
492
493impl InequalityInputPair {
494    pub fn new(left_idx: usize, right_idx: usize, op: ExprType) -> Self {
495        debug_assert!(matches!(
496            op,
497            ExprType::LessThan
498                | ExprType::LessThanOrEqual
499                | ExprType::GreaterThan
500                | ExprType::GreaterThanOrEqual
501        ));
502        Self {
503            left_idx,
504            right_idx,
505            op,
506        }
507    }
508
509    /// Returns true if the left side has larger values based on the operator.
510    /// For `>` and `>=`, left side is larger.
511    /// State cleanup applies to the side with larger values.
512    pub fn left_side_is_larger(&self) -> bool {
513        matches!(
514            self.op,
515            ExprType::GreaterThan | ExprType::GreaterThanOrEqual
516        )
517    }
518}
519
520impl ExprImpl {
521    /// This function is not meant to be called. In most cases you would want
522    /// [`ExprImpl::has_correlated_input_ref_by_depth`].
523    ///
524    /// When an expr contains a [`CorrelatedInputRef`] with lower depth, the whole expr is still
525    /// considered to be uncorrelated, and can be checked with [`ExprImpl::has_subquery`] as well.
526    /// See examples on [`crate::binder::BoundQuery::is_correlated_by_depth`] for details.
527    ///
528    /// This is a placeholder to trigger a compiler error when a trivial implementation checking for
529    /// enum variant is generated by accident. It cannot be called either because you cannot pass
530    /// `Infallible` to it.
531    pub fn has_correlated_input_ref(&self, _: std::convert::Infallible) -> bool {
532        unreachable!()
533    }
534
535    /// Used to check whether the expression has [`CorrelatedInputRef`].
536    ///
537    /// This is the core logic that supports [`crate::binder::BoundQuery::is_correlated_by_depth`]. Check the
538    /// doc of it for examples of `depth` being equal, less or greater.
539    // We need to traverse inside subqueries.
540    pub fn has_correlated_input_ref_by_depth(&self, depth: Depth) -> bool {
541        struct Has {
542            depth: usize,
543            has: bool,
544        }
545
546        impl ExprVisitor for Has {
547            fn visit_correlated_input_ref(&mut self, correlated_input_ref: &CorrelatedInputRef) {
548                if correlated_input_ref.depth() == self.depth {
549                    self.has = true;
550                }
551            }
552
553            fn visit_subquery(&mut self, subquery: &Subquery) {
554                self.has |= subquery.is_correlated_by_depth(self.depth);
555            }
556        }
557
558        let mut visitor = Has { depth, has: false };
559        visitor.visit_expr(self);
560        visitor.has
561    }
562
563    pub fn has_correlated_input_ref_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
564        struct Has {
565            correlated_id: CorrelatedId,
566            has: bool,
567        }
568
569        impl ExprVisitor for Has {
570            fn visit_correlated_input_ref(&mut self, correlated_input_ref: &CorrelatedInputRef) {
571                if correlated_input_ref.correlated_id() == self.correlated_id {
572                    self.has = true;
573                }
574            }
575
576            fn visit_subquery(&mut self, subquery: &Subquery) {
577                self.has |= subquery.is_correlated_by_correlated_id(self.correlated_id);
578            }
579        }
580
581        let mut visitor = Has {
582            correlated_id,
583            has: false,
584        };
585        visitor.visit_expr(self);
586        visitor.has
587    }
588
589    /// Collect `CorrelatedInputRef`s in `ExprImpl` by relative `depth`, return their indices, and
590    /// assign absolute `correlated_id` for them.
591    pub fn collect_correlated_indices_by_depth_and_assign_id(
592        &mut self,
593        depth: Depth,
594        correlated_id: CorrelatedId,
595    ) -> Vec<usize> {
596        struct Collector {
597            depth: Depth,
598            correlated_indices: Vec<usize>,
599            correlated_id: CorrelatedId,
600        }
601
602        impl ExprMutator for Collector {
603            fn visit_correlated_input_ref(
604                &mut self,
605                correlated_input_ref: &mut CorrelatedInputRef,
606            ) {
607                if correlated_input_ref.depth() == self.depth {
608                    self.correlated_indices.push(correlated_input_ref.index());
609                    correlated_input_ref.set_correlated_id(self.correlated_id);
610                }
611            }
612
613            fn visit_subquery(&mut self, subquery: &mut Subquery) {
614                self.correlated_indices.extend(
615                    subquery.collect_correlated_indices_by_depth_and_assign_id(
616                        self.depth,
617                        self.correlated_id,
618                    ),
619                );
620            }
621        }
622
623        let mut collector = Collector {
624            depth,
625            correlated_indices: vec![],
626            correlated_id,
627        };
628        collector.visit_expr(self);
629        collector.correlated_indices.sort();
630        collector.correlated_indices.dedup();
631        collector.correlated_indices
632    }
633
634    pub fn only_literal_and_func(&self) -> bool {
635        {
636            struct HasOthers {
637                has_others: bool,
638            }
639
640            impl ExprVisitor for HasOthers {
641                fn visit_expr(&mut self, expr: &ExprImpl) {
642                    match expr {
643                        ExprImpl::CorrelatedInputRef(_)
644                        | ExprImpl::InputRef(_)
645                        | ExprImpl::AggCall(_)
646                        | ExprImpl::Subquery(_)
647                        | ExprImpl::TableFunction(_)
648                        | ExprImpl::WindowFunction(_)
649                        | ExprImpl::UserDefinedFunction(_)
650                        | ExprImpl::Parameter(_)
651                        | ExprImpl::Now(_) => self.has_others = true,
652                        ExprImpl::Literal(_inner) => {}
653                        ExprImpl::FunctionCall(inner) => {
654                            if !self.is_short_circuit(inner) {
655                                // only if the current `func_call` is *not* a short-circuit
656                                // expression, e.g., true or (...) | false and (...),
657                                // shall we proceed to visit it.
658                                self.visit_function_call(inner)
659                            }
660                        }
661                        ExprImpl::FunctionCallWithLambda(inner) => {
662                            self.visit_function_call_with_lambda(inner)
663                        }
664                    }
665                }
666            }
667
668            impl HasOthers {
669                fn is_short_circuit(&self, func_call: &FunctionCall) -> bool {
670                    /// evaluate the first parameter of `Or` or `And` function call
671                    fn eval_first(e: &ExprImpl, expect: bool) -> bool {
672                        if let ExprImpl::Literal(l) = e {
673                            *l.get_data() == Some(ScalarImpl::Bool(expect))
674                        } else {
675                            false
676                        }
677                    }
678
679                    match func_call.func_type {
680                        ExprType::Or => eval_first(&func_call.inputs()[0], true),
681                        ExprType::And => eval_first(&func_call.inputs()[0], false),
682                        _ => false,
683                    }
684                }
685            }
686
687            let mut visitor = HasOthers { has_others: false };
688            visitor.visit_expr(self);
689            !visitor.has_others
690        }
691    }
692
693    /// Checks whether this is a constant expr that can be evaluated over a dummy chunk.
694    ///
695    /// The expression tree should only consist of literals and **pure** function calls.
696    pub fn is_const(&self) -> bool {
697        self.only_literal_and_func() && self.is_pure()
698    }
699
700    /// Returns the `InputRefs` of an Equality predicate if it matches
701    /// ordered by the canonical ordering (lower, higher), else returns None
702    pub fn as_eq_cond(&self) -> Option<(InputRef, InputRef)> {
703        if let ExprImpl::FunctionCall(function_call) = self
704            && function_call.func_type() == ExprType::Equal
705            && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) =
706                function_call.clone().decompose_as_binary()
707        {
708            if x.index() < y.index() {
709                Some((*x, *y))
710            } else {
711                Some((*y, *x))
712            }
713        } else {
714            None
715        }
716    }
717
718    pub fn as_is_not_distinct_from_cond(&self) -> Option<(InputRef, InputRef)> {
719        if let ExprImpl::FunctionCall(function_call) = self
720            && function_call.func_type() == ExprType::IsNotDistinctFrom
721            && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) =
722                function_call.clone().decompose_as_binary()
723        {
724            if x.index() < y.index() {
725                Some((*x, *y))
726            } else {
727                Some((*y, *x))
728            }
729        } else {
730            None
731        }
732    }
733
734    pub fn reverse_comparison(comparison: ExprType) -> ExprType {
735        match comparison {
736            ExprType::LessThan => ExprType::GreaterThan,
737            ExprType::LessThanOrEqual => ExprType::GreaterThanOrEqual,
738            ExprType::GreaterThan => ExprType::LessThan,
739            ExprType::GreaterThanOrEqual => ExprType::LessThanOrEqual,
740            ExprType::Equal | ExprType::IsNotDistinctFrom => comparison,
741            _ => unreachable!(),
742        }
743    }
744
745    pub fn as_comparison_cond(&self) -> Option<(InputRef, ExprType, InputRef)> {
746        if let ExprImpl::FunctionCall(function_call) = self {
747            match function_call.func_type() {
748                ty @ (ExprType::LessThan
749                | ExprType::LessThanOrEqual
750                | ExprType::GreaterThan
751                | ExprType::GreaterThanOrEqual) => {
752                    let (_, op1, op2) = function_call.clone().decompose_as_binary();
753                    if let (ExprImpl::InputRef(x), ExprImpl::InputRef(y)) = (op1, op2) {
754                        if x.index < y.index {
755                            Some((*x, ty, *y))
756                        } else {
757                            Some((*y, Self::reverse_comparison(ty), *x))
758                        }
759                    } else {
760                        None
761                    }
762                }
763                _ => None,
764            }
765        } else {
766            None
767        }
768    }
769
770    /// Accepts expressions of the form `input_expr cmp now_expr` or `now_expr cmp input_expr`,
771    /// where `input_expr` contains an `InputRef` and contains no `now()`, and `now_expr`
772    /// contains a `now()` but no `InputRef`.
773    ///
774    /// Canonicalizes to the first ordering and returns `(input_expr, cmp, now_expr)`
775    pub fn as_now_comparison_cond(&self) -> Option<(ExprImpl, ExprType, ExprImpl)> {
776        if let ExprImpl::FunctionCall(function_call) = self {
777            match function_call.func_type() {
778                ty @ (ExprType::Equal
779                | ExprType::LessThan
780                | ExprType::LessThanOrEqual
781                | ExprType::GreaterThan
782                | ExprType::GreaterThanOrEqual) => {
783                    let (_, op1, op2) = function_call.clone().decompose_as_binary();
784                    if !op1.has_now()
785                        && op1.has_input_ref()
786                        && op2.has_now()
787                        && !op2.has_input_ref()
788                    {
789                        Some((op1, ty, op2))
790                    } else if op1.has_now()
791                        && !op1.has_input_ref()
792                        && !op2.has_now()
793                        && op2.has_input_ref()
794                    {
795                        Some((op2, Self::reverse_comparison(ty), op1))
796                    } else {
797                        None
798                    }
799                }
800                _ => None,
801            }
802        } else {
803            None
804        }
805    }
806
807    /// Returns the `InputRef` and offset of a predicate if it matches
808    /// the form `InputRef [+- const_expr]`, else returns None.
809    ///
810    /// Deprecated: Only used by `as_input_comparison_cond`.
811    #[allow(dead_code)]
812    fn as_input_offset(&self) -> Option<(usize, Option<(ExprType, ExprImpl)>)> {
813        match self {
814            ExprImpl::InputRef(input_ref) => Some((input_ref.index(), None)),
815            ExprImpl::FunctionCall(function_call) => {
816                let expr_type = function_call.func_type();
817                match expr_type {
818                    ExprType::Add | ExprType::Subtract => {
819                        let (_, lhs, rhs) = function_call.clone().decompose_as_binary();
820                        if let ExprImpl::InputRef(input_ref) = &lhs
821                            && rhs.is_const()
822                        {
823                            // Currently we will return `None` for non-literal because the result of the expression might be '1 day'. However, there will definitely exist false positives such as '1 second + 1 second'.
824                            // We will treat the expression as an input offset when rhs is `null`.
825                            if rhs.return_type() == DataType::Interval
826                                && rhs.as_literal().is_none_or(|literal| {
827                                    literal.get_data().as_ref().is_some_and(|scalar| {
828                                        let interval = scalar.as_interval();
829                                        interval.months() != 0 || interval.days() != 0
830                                    })
831                                })
832                            {
833                                None
834                            } else {
835                                Some((input_ref.index(), Some((expr_type, rhs))))
836                            }
837                        } else {
838                            None
839                        }
840                    }
841                    _ => None,
842                }
843            }
844            _ => None,
845        }
846    }
847
848    pub fn as_eq_const(&self) -> Option<(InputRef, ExprImpl)> {
849        if let ExprImpl::FunctionCall(function_call) = self
850            && function_call.func_type() == ExprType::Equal
851        {
852            match function_call.clone().decompose_as_binary() {
853                (_, ExprImpl::InputRef(x), y) if y.is_const() => Some((*x, y)),
854                (_, x, ExprImpl::InputRef(y)) if x.is_const() => Some((*y, x)),
855                _ => None,
856            }
857        } else {
858            None
859        }
860    }
861
862    pub fn as_eq_correlated_input_ref(&self) -> Option<(InputRef, CorrelatedInputRef)> {
863        if let ExprImpl::FunctionCall(function_call) = self
864            && function_call.func_type() == ExprType::Equal
865        {
866            match function_call.clone().decompose_as_binary() {
867                (_, ExprImpl::InputRef(x), ExprImpl::CorrelatedInputRef(y)) => Some((*x, *y)),
868                (_, ExprImpl::CorrelatedInputRef(x), ExprImpl::InputRef(y)) => Some((*y, *x)),
869                _ => None,
870            }
871        } else {
872            None
873        }
874    }
875
876    pub fn as_is_null(&self) -> Option<InputRef> {
877        if let ExprImpl::FunctionCall(function_call) = self
878            && function_call.func_type() == ExprType::IsNull
879        {
880            match function_call.clone().decompose_as_unary() {
881                (_, ExprImpl::InputRef(x)) => Some(*x),
882                _ => None,
883            }
884        } else {
885            None
886        }
887    }
888
889    pub fn as_comparison_const(&self) -> Option<(InputRef, ExprType, ExprImpl)> {
890        fn reverse_comparison(comparison: ExprType) -> ExprType {
891            match comparison {
892                ExprType::LessThan => ExprType::GreaterThan,
893                ExprType::LessThanOrEqual => ExprType::GreaterThanOrEqual,
894                ExprType::GreaterThan => ExprType::LessThan,
895                ExprType::GreaterThanOrEqual => ExprType::LessThanOrEqual,
896                _ => unreachable!(),
897            }
898        }
899
900        if let ExprImpl::FunctionCall(function_call) = self {
901            match function_call.func_type() {
902                ty @ (ExprType::LessThan
903                | ExprType::LessThanOrEqual
904                | ExprType::GreaterThan
905                | ExprType::GreaterThanOrEqual) => {
906                    let (_, op1, op2) = function_call.clone().decompose_as_binary();
907                    match (op1, op2) {
908                        (ExprImpl::InputRef(x), y) if y.is_const() => Some((*x, ty, y)),
909                        (x, ExprImpl::InputRef(y)) if x.is_const() => {
910                            Some((*y, reverse_comparison(ty), x))
911                        }
912                        _ => None,
913                    }
914                }
915                _ => None,
916            }
917        } else {
918            None
919        }
920    }
921
922    pub fn as_in_const_list(&self) -> Option<(InputRef, Vec<ExprImpl>)> {
923        if let ExprImpl::FunctionCall(function_call) = self
924            && function_call.func_type() == ExprType::In
925        {
926            let mut inputs = function_call.inputs().iter().cloned();
927            let input_ref = match inputs.next().unwrap() {
928                ExprImpl::InputRef(i) => *i,
929                _ => return None,
930            };
931            let list: Vec<_> = inputs
932                .inspect(|expr| {
933                    // Non constant IN will be bound to OR
934                    assert!(expr.is_const());
935                })
936                .collect();
937
938            Some((input_ref, list))
939        } else {
940            None
941        }
942    }
943
944    pub fn as_or_disjunctions(&self) -> Option<Vec<ExprImpl>> {
945        if let ExprImpl::FunctionCall(function_call) = self
946            && function_call.func_type() == ExprType::Or
947        {
948            Some(to_disjunctions(self.clone()))
949        } else {
950            None
951        }
952    }
953
954    pub fn to_project_set_select_item_proto(&self) -> ProjectSetSelectItem {
955        use risingwave_pb::expr::project_set_select_item::SelectItem::*;
956
957        ProjectSetSelectItem {
958            select_item: Some(match self {
959                ExprImpl::TableFunction(tf) => TableFunction(tf.to_protobuf()),
960                expr => Expr(expr.to_expr_proto()),
961            }),
962        }
963    }
964
965    /// Serialize the expression. Returns an error if this will result in an impure expression on a
966    /// retract stream, which may lead to inconsistent results.
967    pub fn to_project_set_select_item_proto_checked_pure(
968        &self,
969        retract: bool,
970    ) -> crate::error::Result<ProjectSetSelectItem> {
971        use risingwave_pb::expr::project_set_select_item::SelectItem::*;
972
973        Ok(ProjectSetSelectItem {
974            select_item: Some(match self {
975                ExprImpl::TableFunction(tf) => TableFunction(tf.to_protobuf_checked_pure(retract)?),
976                expr => Expr(expr.to_expr_proto_checked_pure(retract, "SELECT list")?),
977            }),
978        })
979    }
980
981    pub fn from_expr_proto(proto: &ExprNode) -> RwResult<Self> {
982        let rex_node = proto.get_rex_node()?;
983        let ret_type = proto.get_return_type()?.into();
984
985        Ok(match rex_node {
986            RexNode::InputRef(column_index) => Self::InputRef(Box::new(InputRef::from_expr_proto(
987                *column_index as _,
988                ret_type,
989            )?)),
990            RexNode::Constant(_) => Self::Literal(Box::new(Literal::from_expr_proto(proto)?)),
991            RexNode::Udf(udf) => Self::UserDefinedFunction(Box::new(
992                UserDefinedFunction::from_expr_proto(udf, ret_type)?,
993            )),
994            RexNode::FuncCall(function_call) => {
995                Self::FunctionCall(Box::new(FunctionCall::from_expr_proto(
996                    function_call,
997                    proto.get_function_type()?, // only interpret if it's a function call
998                    ret_type,
999                )?))
1000            }
1001            RexNode::Now(_) => Self::Now(Box::new(Now {})),
1002        })
1003    }
1004}
1005
1006impl From<Condition> for ExprImpl {
1007    fn from(c: Condition) -> Self {
1008        ExprImpl::and(c.conjunctions)
1009    }
1010}
1011
1012/// A custom Debug implementation that is more concise and suitable to use with
1013/// [`std::fmt::Formatter::debug_list`] in plan nodes. If the verbose output is preferred, it is
1014/// still available via `{:#?}`.
1015impl std::fmt::Debug for ExprImpl {
1016    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1017        if f.alternate() {
1018            return match self {
1019                Self::InputRef(arg0) => f.debug_tuple("InputRef").field(arg0).finish(),
1020                Self::Literal(arg0) => f.debug_tuple("Literal").field(arg0).finish(),
1021                Self::FunctionCall(arg0) => f.debug_tuple("FunctionCall").field(arg0).finish(),
1022                Self::FunctionCallWithLambda(arg0) => {
1023                    f.debug_tuple("FunctionCallWithLambda").field(arg0).finish()
1024                }
1025                Self::AggCall(arg0) => f.debug_tuple("AggCall").field(arg0).finish(),
1026                Self::Subquery(arg0) => f.debug_tuple("Subquery").field(arg0).finish(),
1027                Self::CorrelatedInputRef(arg0) => {
1028                    f.debug_tuple("CorrelatedInputRef").field(arg0).finish()
1029                }
1030                Self::TableFunction(arg0) => f.debug_tuple("TableFunction").field(arg0).finish(),
1031                Self::WindowFunction(arg0) => f.debug_tuple("WindowFunction").field(arg0).finish(),
1032                Self::UserDefinedFunction(arg0) => {
1033                    f.debug_tuple("UserDefinedFunction").field(arg0).finish()
1034                }
1035                Self::Parameter(arg0) => f.debug_tuple("Parameter").field(arg0).finish(),
1036                Self::Now(_) => f.debug_tuple("Now").finish(),
1037            };
1038        }
1039        match self {
1040            Self::InputRef(x) => write!(f, "{:?}", x),
1041            Self::Literal(x) => write!(f, "{:?}", x),
1042            Self::FunctionCall(x) => write!(f, "{:?}", x),
1043            Self::FunctionCallWithLambda(x) => write!(f, "{:?}", x),
1044            Self::AggCall(x) => write!(f, "{:?}", x),
1045            Self::Subquery(x) => write!(f, "{:?}", x),
1046            Self::CorrelatedInputRef(x) => write!(f, "{:?}", x),
1047            Self::TableFunction(x) => write!(f, "{:?}", x),
1048            Self::WindowFunction(x) => write!(f, "{:?}", x),
1049            Self::UserDefinedFunction(x) => write!(f, "{:?}", x),
1050            Self::Parameter(x) => write!(f, "{:?}", x),
1051            Self::Now(x) => write!(f, "{:?}", x),
1052        }
1053    }
1054}
1055
1056pub struct ExprDisplay<'a> {
1057    pub expr: &'a ExprImpl,
1058    pub input_schema: &'a Schema,
1059}
1060
1061impl std::fmt::Debug for ExprDisplay<'_> {
1062    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1063        let that = self.expr;
1064        match that {
1065            ExprImpl::InputRef(x) => write!(
1066                f,
1067                "{:?}",
1068                InputRefDisplay {
1069                    input_ref: x,
1070                    input_schema: self.input_schema
1071                }
1072            ),
1073            ExprImpl::Literal(x) => write!(f, "{:?}", x),
1074            ExprImpl::FunctionCall(x) => write!(
1075                f,
1076                "{:?}",
1077                FunctionCallDisplay {
1078                    function_call: x,
1079                    input_schema: self.input_schema
1080                }
1081            ),
1082            ExprImpl::FunctionCallWithLambda(x) => write!(
1083                f,
1084                "{:?}",
1085                FunctionCallDisplay {
1086                    function_call: &x.to_full_function_call(),
1087                    input_schema: self.input_schema
1088                }
1089            ),
1090            ExprImpl::AggCall(x) => write!(f, "{:?}", x),
1091            ExprImpl::Subquery(x) => write!(f, "{:?}", x),
1092            ExprImpl::CorrelatedInputRef(x) => write!(f, "{:?}", x),
1093            ExprImpl::TableFunction(x) => {
1094                // TODO: TableFunctionCallVerboseDisplay
1095                write!(f, "{:?}", x)
1096            }
1097            ExprImpl::WindowFunction(x) => {
1098                // TODO: WindowFunctionCallVerboseDisplay
1099                write!(f, "{:?}", x)
1100            }
1101            ExprImpl::UserDefinedFunction(x) => {
1102                write!(
1103                    f,
1104                    "{:?}",
1105                    UserDefinedFunctionDisplay {
1106                        func_call: x,
1107                        input_schema: self.input_schema
1108                    }
1109                )
1110            }
1111            ExprImpl::Parameter(x) => write!(f, "{:?}", x),
1112            ExprImpl::Now(x) => write!(f, "{:?}", x),
1113        }
1114    }
1115}
1116
1117impl std::fmt::Display for ExprDisplay<'_> {
1118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1119        (self as &dyn std::fmt::Debug).fmt(f)
1120    }
1121}
1122
1123#[cfg(test)]
1124/// Asserts that the expression is an [`InputRef`] with the given index.
1125macro_rules! assert_eq_input_ref {
1126    ($e:expr, $index:expr) => {
1127        match $e {
1128            ExprImpl::InputRef(i) => assert_eq!(i.index(), $index),
1129            _ => assert!(false, "Expected input ref, found {:?}", $e),
1130        }
1131    };
1132}
1133
1134#[cfg(test)]
1135pub(crate) use assert_eq_input_ref;
1136use risingwave_common::bail;
1137use risingwave_common::catalog::Schema;
1138use risingwave_common::row::OwnedRow;
1139
1140use crate::utils::Condition;
1141
1142#[cfg(test)]
1143mod tests {
1144    use super::*;
1145
1146    #[test]
1147    fn test_expr_debug_alternate() {
1148        let mut e = InputRef::new(1, DataType::Boolean).into();
1149        e = FunctionCall::new(ExprType::Not, vec![e]).unwrap().into();
1150        let s = format!("{:#?}", e);
1151        assert!(s.contains("return_type: Boolean"))
1152    }
1153}