risingwave_frontend/expr/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use enum_as_inner::EnumAsInner;
16use fixedbitset::FixedBitSet;
17use futures::FutureExt;
18use paste::paste;
19use risingwave_common::array::ListValue;
20use risingwave_common::types::{DataType, Datum, JsonbVal, MapType, Scalar, ScalarImpl};
21use risingwave_expr::aggregate::PbAggKind;
22use risingwave_expr::expr::build_from_prost;
23use risingwave_pb::expr::expr_node::RexNode;
24use risingwave_pb::expr::{ExprNode, ProjectSetSelectItem};
25use user_defined_function::UserDefinedFunctionDisplay;
26
27use crate::error::{ErrorCode, Result as RwResult};
28use crate::session::current;
29
30mod agg_call;
31mod correlated_input_ref;
32mod function_call;
33mod function_call_with_lambda;
34mod input_ref;
35mod literal;
36mod now;
37mod parameter;
38mod pure;
39mod subquery;
40mod table_function;
41mod user_defined_function;
42mod window_function;
43
44mod order_by_expr;
45pub use order_by_expr::{OrderBy, OrderByExpr};
46
47mod expr_mutator;
48mod expr_rewriter;
49mod expr_visitor;
50pub mod function_impl;
51mod secret_ref;
52mod session_timezone;
53mod type_inference;
54mod utils;
55
56pub use agg_call::AggCall;
57pub use correlated_input_ref::{CorrelatedId, CorrelatedInputRef, Depth, InputRefDepthRewriter};
58pub use expr_mutator::ExprMutator;
59pub use expr_rewriter::{ExprRewriter, default_rewrite_expr};
60pub use expr_visitor::{ExprVisitor, default_visit_expr};
61pub use function_call::{FunctionCall, FunctionCallDisplay, is_row_function};
62pub use function_call_with_lambda::FunctionCallWithLambda;
63pub use input_ref::{InputRef, InputRefDisplay, input_ref_to_column_indices};
64pub use literal::Literal;
65pub use now::{InlineNowProcTime, Now, NowProcTimeFinder};
66pub use parameter::Parameter;
67pub use pure::*;
68pub use risingwave_pb::expr::expr_node::Type as ExprType;
69pub use secret_ref::SecretRef;
70pub use session_timezone::{SessionTimezone, TimestamptzExprFinder};
71pub use subquery::{Subquery, SubqueryKind};
72pub use table_function::{TableFunction, TableFunctionType};
73pub use type_inference::*;
74pub use user_defined_function::UserDefinedFunction;
75pub use utils::*;
76pub use window_function::WindowFunction;
77
78pub(crate) const EXPR_DEPTH_THRESHOLD: usize = 30;
79pub(crate) const EXPR_TOO_DEEP_NOTICE: &str = "Some expression is too complicated. \
80Consider simplifying or splitting the query if you encounter any issues.";
81
82pub(crate) fn reject_impure(expr: impl Into<ExprImpl>, context: &str) -> RwResult<()> {
83    if let Some(impure_expr_desc) = impure_expr_desc(&expr.into()) {
84        let msg = format!(
85            "using an impure expression ({impure_expr_desc}) in {context} \
86             on a retract stream may lead to inconsistent results"
87        );
88        if current::config()
89            .is_some_and(|c| c.read().streaming_unsafe_allow_unmaterialized_impure_expr())
90        {
91            current::notice_to_user(msg);
92        } else {
93            return Err(ErrorCode::NotSupported(
94                msg,
95                "rewrite the query to extract the impure expression into the select list, \
96                 or set `streaming_unsafe_allow_unmaterialized_impure_expr` to allow \
97                 the behavior at the risk of inconsistent results or panics during execution"
98                    .into(),
99            )
100            .into());
101        }
102    }
103    Ok(())
104}
105
106/// the trait of bound expressions
107pub trait Expr: Into<ExprImpl> {
108    /// Get the return type of the expr
109    fn return_type(&self) -> DataType;
110
111    /// Try to serialize the expression, returning an error if it's impossible.
112    fn try_to_expr_proto(&self) -> Result<ExprNode, String>;
113
114    /// Serialize the expression. Panic if it's impossible.
115    fn to_expr_proto(&self) -> ExprNode {
116        self.try_to_expr_proto()
117            .expect("failed to serialize expression to protobuf")
118    }
119
120    /// Serialize the expression. Returns an error if this will result in an impure expression on a
121    /// retract stream, which may lead to inconsistent results.
122    fn to_expr_proto_checked_pure(
123        &self,
124        retract: bool,
125        context: &str,
126    ) -> crate::error::Result<ExprNode>
127    where
128        Self: Clone,
129    {
130        if retract {
131            reject_impure(self.clone(), context)?;
132        }
133        self.try_to_expr_proto()
134            .map_err(|e| ErrorCode::InternalError(e).into())
135    }
136}
137
138macro_rules! impl_expr_impl {
139    ($($t:ident,)*) => {
140        #[derive(Clone, Eq, PartialEq, Hash, EnumAsInner)]
141        pub enum ExprImpl {
142            $($t(Box<$t>),)*
143        }
144
145        impl ExprImpl {
146            pub fn variant_name(&self) -> &'static str {
147                match self {
148                    $(ExprImpl::$t(_) => stringify!($t),)*
149                }
150            }
151        }
152
153        $(
154        impl From<$t> for ExprImpl {
155            fn from(o: $t) -> ExprImpl {
156                ExprImpl::$t(Box::new(o))
157            }
158        })*
159
160        impl Expr for ExprImpl {
161            fn return_type(&self) -> DataType {
162                match self {
163                    $(ExprImpl::$t(expr) => expr.return_type(),)*
164                }
165            }
166
167            fn try_to_expr_proto(&self) -> Result<ExprNode, String> {
168                match self {
169                    $(ExprImpl::$t(expr) => expr.try_to_expr_proto(),)*
170                }
171            }
172        }
173    };
174}
175
176impl_expr_impl!(
177    // BoundColumnRef, might be used in binder.
178    CorrelatedInputRef,
179    InputRef,
180    Literal,
181    FunctionCall,
182    FunctionCallWithLambda,
183    AggCall,
184    Subquery,
185    TableFunction,
186    WindowFunction,
187    UserDefinedFunction,
188    Parameter,
189    Now,
190    SecretRef,
191);
192
193impl ExprImpl {
194    /// A literal int value.
195    #[inline(always)]
196    pub fn literal_int(v: i32) -> Self {
197        Literal::new(Some(v.to_scalar_value()), DataType::Int32).into()
198    }
199
200    /// A literal bigint value
201    #[inline(always)]
202    pub fn literal_bigint(v: i64) -> Self {
203        Literal::new(Some(v.to_scalar_value()), DataType::Int64).into()
204    }
205
206    /// A literal float64 value.
207    #[inline(always)]
208    pub fn literal_f64(v: f64) -> Self {
209        Literal::new(Some(v.into()), DataType::Float64).into()
210    }
211
212    /// A literal boolean value.
213    #[inline(always)]
214    pub fn literal_bool(v: bool) -> Self {
215        Literal::new(Some(v.to_scalar_value()), DataType::Boolean).into()
216    }
217
218    /// A literal varchar value.
219    #[inline(always)]
220    pub fn literal_varchar(v: String) -> Self {
221        Literal::new(Some(v.into()), DataType::Varchar).into()
222    }
223
224    /// A literal null value.
225    #[inline(always)]
226    pub fn literal_null(element_type: DataType) -> Self {
227        Literal::new(None, element_type).into()
228    }
229
230    /// A literal jsonb value.
231    #[inline(always)]
232    pub fn literal_jsonb(v: JsonbVal) -> Self {
233        Literal::new(Some(v.into()), DataType::Jsonb).into()
234    }
235
236    /// A literal list value.
237    #[inline(always)]
238    pub fn literal_list(v: ListValue, element_type: DataType) -> Self {
239        Literal::new(Some(v.to_scalar_value()), DataType::list(element_type)).into()
240    }
241
242    /// Takes the expression, leaving a literal null of the same type in its place.
243    pub fn take(&mut self) -> Self {
244        std::mem::replace(self, Self::literal_null(self.return_type()))
245    }
246
247    /// A `count(*)` aggregate function.
248    #[inline(always)]
249    pub fn count_star() -> Self {
250        AggCall::new(
251            PbAggKind::Count.into(),
252            vec![],
253            false,
254            OrderBy::any(),
255            Condition::true_cond(),
256            vec![],
257        )
258        .unwrap()
259        .into()
260    }
261
262    /// Create a new expression by merging the given expressions by `And`.
263    ///
264    /// If `exprs` is empty, return a literal `true`.
265    pub fn and(exprs: impl IntoIterator<Item = ExprImpl>) -> Self {
266        merge_expr_by_logical(exprs, ExprType::And, ExprImpl::literal_bool(true))
267    }
268
269    /// Create a new expression by merging the given expressions by `Or`.
270    ///
271    /// If `exprs` is empty, return a literal `false`.
272    pub fn or(exprs: impl IntoIterator<Item = ExprImpl>) -> Self {
273        merge_expr_by_logical(exprs, ExprType::Or, ExprImpl::literal_bool(false))
274    }
275
276    /// Collect all `InputRef`s' indexes in the expression.
277    ///
278    /// # Panics
279    /// Panics if `input_ref >= input_col_num`.
280    pub fn collect_input_refs(&self, input_col_num: usize) -> FixedBitSet {
281        collect_input_refs(input_col_num, [self])
282    }
283
284    /// Check if the expression has no side effects and output is deterministic
285    pub fn is_pure(&self) -> bool {
286        is_pure(self)
287    }
288
289    pub fn is_impure(&self) -> bool {
290        is_impure(self)
291    }
292
293    /// Count `Now`s in the expression.
294    pub fn count_nows(&self) -> usize {
295        let mut visitor = CountNow::default();
296        visitor.visit_expr(self);
297        visitor.count()
298    }
299
300    /// Check whether self is literal NULL.
301    pub fn is_null(&self) -> bool {
302        matches!(self, ExprImpl::Literal(literal) if literal.get_data().is_none())
303    }
304
305    /// Check whether self is a literal NULL or literal string.
306    pub fn is_untyped(&self) -> bool {
307        matches!(self, ExprImpl::Literal(literal) if literal.is_untyped())
308            || matches!(self, ExprImpl::Parameter(parameter) if !parameter.has_infer())
309    }
310
311    /// Shorthand to create cast expr to `target` type in implicit context.
312    pub fn cast_implicit(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
313        FunctionCall::cast_mut(&mut self, target, CastContext::Implicit)?;
314        Ok(self)
315    }
316
317    /// Shorthand to create cast expr to `target` type in assign context.
318    pub fn cast_assign(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
319        FunctionCall::cast_mut(&mut self, target, CastContext::Assign)?;
320        Ok(self)
321    }
322
323    /// Shorthand to create cast expr to `target` type in explicit context.
324    pub fn cast_explicit(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
325        FunctionCall::cast_mut(&mut self, target, CastContext::Explicit)?;
326        Ok(self)
327    }
328
329    /// Shorthand to inplace cast expr to `target` type in implicit context.
330    pub fn cast_implicit_mut(&mut self, target: &DataType) -> Result<(), CastError> {
331        FunctionCall::cast_mut(self, target, CastContext::Implicit)
332    }
333
334    /// Shorthand to inplace cast expr to `target` type in explicit context.
335    pub fn cast_explicit_mut(&mut self, target: &DataType) -> Result<(), CastError> {
336        FunctionCall::cast_mut(self, target, CastContext::Explicit)
337    }
338
339    /// Casting to Regclass type means getting the oid of expr.
340    /// See <https://www.postgresql.org/docs/current/datatype-oid.html>
341    pub fn cast_to_regclass(self) -> Result<ExprImpl, CastError> {
342        match self.return_type() {
343            DataType::Varchar => Ok(ExprImpl::FunctionCall(Box::new(
344                FunctionCall::new_unchecked(ExprType::CastRegclass, vec![self], DataType::Int32),
345            ))),
346            DataType::Int32 => Ok(self),
347            dt if dt.is_int() => Ok(self.cast_explicit(&DataType::Int32)?),
348            _ => bail_cast_error!("unsupported input type"),
349        }
350    }
351
352    /// Shorthand to inplace cast expr to `regclass` type.
353    pub fn cast_to_regclass_mut(&mut self) -> Result<(), CastError> {
354        let owned = std::mem::replace(self, ExprImpl::literal_bool(false));
355        *self = owned.cast_to_regclass()?;
356        Ok(())
357    }
358
359    /// Ensure the return type of this expression is an array of some type.
360    pub fn ensure_array_type(&self) -> Result<(), ErrorCode> {
361        if self.is_untyped() {
362            return Err(ErrorCode::BindError(
363                "could not determine polymorphic type because input has type unknown".into(),
364            ));
365        }
366        match self.return_type() {
367            DataType::List(_) => Ok(()),
368            t => Err(ErrorCode::BindError(format!("expects array but got {t}"))),
369        }
370    }
371
372    /// Ensure the return type of this expression is a map of some type.
373    pub fn try_into_map_type(&self) -> Result<MapType, ErrorCode> {
374        if self.is_untyped() {
375            return Err(ErrorCode::BindError(
376                "could not determine polymorphic type because input has type unknown".into(),
377            ));
378        }
379        match self.return_type() {
380            DataType::Map(m) => Ok(m),
381            t => Err(ErrorCode::BindError(format!("expects map but got {t}"))),
382        }
383    }
384
385    /// Shorthand to enforce implicit cast to boolean
386    pub fn enforce_bool_clause(self, clause: &str) -> RwResult<ExprImpl> {
387        if self.is_untyped() {
388            let inner = self.cast_implicit(&DataType::Boolean)?;
389            return Ok(inner);
390        }
391        let return_type = self.return_type();
392        if return_type != DataType::Boolean {
393            bail!(
394                "argument of {} must be boolean, not type {:?}",
395                clause,
396                return_type
397            )
398        }
399        Ok(self)
400    }
401
402    /// Create "cast" expr to string (`varchar`) type. This is different from a real cast, as
403    /// boolean is converted to a single char rather than full word.
404    ///
405    /// Choose between `cast_output` and `cast_{assign,explicit}(Varchar)` based on `PostgreSQL`'s
406    /// behavior on bools. For example, `concat(':', true)` is `:t` but `':' || true` is `:true`.
407    /// All other types have the same behavior when formatting to output and casting to string.
408    ///
409    /// References in `PostgreSQL`:
410    /// * [cast](https://github.com/postgres/postgres/blob/a3ff08e0b08dbfeb777ccfa8f13ebaa95d064c04/src/include/catalog/pg_cast.dat#L437-L444)
411    /// * [impl](https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/backend/utils/adt/bool.c#L204-L209)
412    pub fn cast_output(self) -> RwResult<ExprImpl> {
413        if self.return_type() == DataType::Boolean {
414            return Ok(FunctionCall::new(ExprType::BoolOut, vec![self])?.into());
415        }
416        // Use normal cast for other types. Both `assign` and `explicit` can pass the castability
417        // check and there is no difference.
418        self.cast_assign(&DataType::Varchar)
419            .map_err(|err| err.into())
420    }
421
422    /// Evaluate the expression on the given input.
423    ///
424    /// TODO: This is a naive implementation. We should avoid proto ser/de.
425    /// Tracking issue: <https://github.com/risingwavelabs/risingwave/issues/3479>
426    pub async fn eval_row(&self, input: &OwnedRow) -> RwResult<Datum> {
427        let backend_expr = build_from_prost(&self.to_expr_proto())?;
428        Ok(backend_expr.eval_row(input).await?)
429    }
430
431    /// Try to evaluate an expression if it's a constant expression by `ExprImpl::is_const`.
432    ///
433    /// Returns...
434    /// - `None` if it's not a constant expression,
435    /// - `Some(Ok(_))` if constant evaluation succeeds,
436    /// - `Some(Err(_))` if there's an error while evaluating a constant expression.
437    pub fn try_fold_const(&self) -> Option<RwResult<Datum>> {
438        if self.is_const() {
439            self.eval_row(&OwnedRow::empty())
440                .now_or_never()
441                .expect("constant expression should not be async")
442                .into()
443        } else {
444            None
445        }
446    }
447
448    /// Similar to `ExprImpl::try_fold_const`, but panics if the expression is not constant.
449    pub fn fold_const(&self) -> RwResult<Datum> {
450        self.try_fold_const().expect("expression is not constant")
451    }
452}
453
454/// Implement helper functions which recursively checks whether an variant is included in the
455/// expression. e.g., `has_subquery(&self) -> bool`
456///
457/// It will not traverse inside subqueries.
458macro_rules! impl_has_variant {
459    ( $($variant:ty),* ) => {
460        paste! {
461            impl ExprImpl {
462                $(
463                    pub fn [<has_ $variant:snake>](&self) -> bool {
464                        struct Has { has: bool }
465
466                        impl ExprVisitor for Has {
467                            fn [<visit_ $variant:snake>](&mut self, _: &$variant) {
468                                self.has = true;
469                            }
470                        }
471
472                        let mut visitor = Has { has: false };
473                        visitor.visit_expr(self);
474                        visitor.has
475                    }
476                )*
477            }
478        }
479    };
480}
481
482impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction, UserDefinedFunction, Now}
483
484/// Inequality condition between two input columns with clearer semantics.
485/// Represents: `left_col <op> right_col` where op is one of `<`, `<=`, `>`, `>=`.
486#[derive(Debug, Clone, PartialEq, Eq, Hash)]
487pub struct InequalityInputPair {
488    /// Index of the left side column (from left input).
489    pub left_idx: usize,
490    /// Index of the right side column (from right input, NOT offset by `left_cols_num`).
491    pub right_idx: usize,
492    /// Comparison operator: `left_col <op> right_col`.
493    pub op: ExprType,
494}
495
496impl InequalityInputPair {
497    pub fn new(left_idx: usize, right_idx: usize, op: ExprType) -> Self {
498        debug_assert!(matches!(
499            op,
500            ExprType::LessThan
501                | ExprType::LessThanOrEqual
502                | ExprType::GreaterThan
503                | ExprType::GreaterThanOrEqual
504        ));
505        Self {
506            left_idx,
507            right_idx,
508            op,
509        }
510    }
511
512    /// Returns true if the left side has larger values based on the operator.
513    /// For `>` and `>=`, left side is larger.
514    /// State cleanup applies to the side with larger values.
515    pub fn left_side_is_larger(&self) -> bool {
516        matches!(
517            self.op,
518            ExprType::GreaterThan | ExprType::GreaterThanOrEqual
519        )
520    }
521}
522
523impl ExprImpl {
524    /// This function is not meant to be called. In most cases you would want
525    /// [`ExprImpl::has_correlated_input_ref_by_depth`].
526    ///
527    /// When an expr contains a [`CorrelatedInputRef`] with lower depth, the whole expr is still
528    /// considered to be uncorrelated, and can be checked with [`ExprImpl::has_subquery`] as well.
529    /// See examples on [`crate::binder::BoundQuery::is_correlated_by_depth`] for details.
530    ///
531    /// This is a placeholder to trigger a compiler error when a trivial implementation checking for
532    /// enum variant is generated by accident. It cannot be called either because you cannot pass
533    /// `Infallible` to it.
534    pub fn has_correlated_input_ref(&self, _: std::convert::Infallible) -> bool {
535        unreachable!()
536    }
537
538    /// Used to check whether the expression has [`CorrelatedInputRef`].
539    ///
540    /// This is the core logic that supports [`crate::binder::BoundQuery::is_correlated_by_depth`]. Check the
541    /// doc of it for examples of `depth` being equal, less or greater.
542    // We need to traverse inside subqueries.
543    pub fn has_correlated_input_ref_by_depth(&self, depth: Depth) -> bool {
544        struct Has {
545            depth: usize,
546            has: bool,
547        }
548
549        impl ExprVisitor for Has {
550            fn visit_correlated_input_ref(&mut self, correlated_input_ref: &CorrelatedInputRef) {
551                if correlated_input_ref.depth() == self.depth {
552                    self.has = true;
553                }
554            }
555
556            fn visit_subquery(&mut self, subquery: &Subquery) {
557                self.has |= subquery.is_correlated_by_depth(self.depth);
558            }
559        }
560
561        let mut visitor = Has { depth, has: false };
562        visitor.visit_expr(self);
563        visitor.has
564    }
565
566    pub fn has_correlated_input_ref_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
567        struct Has {
568            correlated_id: CorrelatedId,
569            has: bool,
570        }
571
572        impl ExprVisitor for Has {
573            fn visit_correlated_input_ref(&mut self, correlated_input_ref: &CorrelatedInputRef) {
574                if correlated_input_ref.correlated_id() == self.correlated_id {
575                    self.has = true;
576                }
577            }
578
579            fn visit_subquery(&mut self, subquery: &Subquery) {
580                self.has |= subquery.is_correlated_by_correlated_id(self.correlated_id);
581            }
582        }
583
584        let mut visitor = Has {
585            correlated_id,
586            has: false,
587        };
588        visitor.visit_expr(self);
589        visitor.has
590    }
591
592    /// Collect `CorrelatedInputRef`s in `ExprImpl` by relative `depth`, return their indices, and
593    /// assign absolute `correlated_id` for them.
594    pub fn collect_correlated_indices_by_depth_and_assign_id(
595        &mut self,
596        depth: Depth,
597        correlated_id: CorrelatedId,
598    ) -> Vec<usize> {
599        struct Collector {
600            depth: Depth,
601            correlated_indices: Vec<usize>,
602            correlated_id: CorrelatedId,
603        }
604
605        impl ExprMutator for Collector {
606            fn visit_correlated_input_ref(
607                &mut self,
608                correlated_input_ref: &mut CorrelatedInputRef,
609            ) {
610                if correlated_input_ref.depth() == self.depth {
611                    self.correlated_indices.push(correlated_input_ref.index());
612                    correlated_input_ref.set_correlated_id(self.correlated_id);
613                }
614            }
615
616            fn visit_subquery(&mut self, subquery: &mut Subquery) {
617                self.correlated_indices.extend(
618                    subquery.collect_correlated_indices_by_depth_and_assign_id(
619                        self.depth,
620                        self.correlated_id,
621                    ),
622                );
623            }
624        }
625
626        let mut collector = Collector {
627            depth,
628            correlated_indices: vec![],
629            correlated_id,
630        };
631        collector.visit_expr(self);
632        collector.correlated_indices.sort();
633        collector.correlated_indices.dedup();
634        collector.correlated_indices
635    }
636
637    pub fn only_literal_and_func(&self) -> bool {
638        {
639            struct HasOthers {
640                has_others: bool,
641            }
642
643            impl ExprVisitor for HasOthers {
644                fn visit_expr(&mut self, expr: &ExprImpl) {
645                    match expr {
646                        ExprImpl::CorrelatedInputRef(_)
647                        | ExprImpl::InputRef(_)
648                        | ExprImpl::AggCall(_)
649                        | ExprImpl::Subquery(_)
650                        | ExprImpl::TableFunction(_)
651                        | ExprImpl::WindowFunction(_)
652                        | ExprImpl::UserDefinedFunction(_)
653                        | ExprImpl::Parameter(_)
654                        | ExprImpl::Now(_)
655                        | ExprImpl::SecretRef(_) => self.has_others = true,
656                        ExprImpl::Literal(_inner) => {}
657                        ExprImpl::FunctionCall(inner) => {
658                            if !self.is_short_circuit(inner) {
659                                // only if the current `func_call` is *not* a short-circuit
660                                // expression, e.g., true or (...) | false and (...),
661                                // shall we proceed to visit it.
662                                self.visit_function_call(inner)
663                            }
664                        }
665                        ExprImpl::FunctionCallWithLambda(inner) => {
666                            self.visit_function_call_with_lambda(inner)
667                        }
668                    }
669                }
670            }
671
672            impl HasOthers {
673                fn is_short_circuit(&self, func_call: &FunctionCall) -> bool {
674                    /// evaluate the first parameter of `Or` or `And` function call
675                    fn eval_first(e: &ExprImpl, expect: bool) -> bool {
676                        if let ExprImpl::Literal(l) = e {
677                            *l.get_data() == Some(ScalarImpl::Bool(expect))
678                        } else {
679                            false
680                        }
681                    }
682
683                    match func_call.func_type {
684                        ExprType::Or => eval_first(&func_call.inputs()[0], true),
685                        ExprType::And => eval_first(&func_call.inputs()[0], false),
686                        _ => false,
687                    }
688                }
689            }
690
691            let mut visitor = HasOthers { has_others: false };
692            visitor.visit_expr(self);
693            !visitor.has_others
694        }
695    }
696
697    /// Checks whether this is a constant expr that can be evaluated over a dummy chunk.
698    ///
699    /// The expression tree should only consist of literals and **pure** function calls.
700    pub fn is_const(&self) -> bool {
701        self.only_literal_and_func() && self.is_pure()
702    }
703
704    /// Returns the `InputRefs` of an Equality predicate if it matches
705    /// ordered by the canonical ordering (lower, higher), else returns None
706    pub fn as_eq_cond(&self) -> Option<(InputRef, InputRef)> {
707        if let ExprImpl::FunctionCall(function_call) = self
708            && function_call.func_type() == ExprType::Equal
709            && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) =
710                function_call.clone().decompose_as_binary()
711        {
712            if x.index() < y.index() {
713                Some((*x, *y))
714            } else {
715                Some((*y, *x))
716            }
717        } else {
718            None
719        }
720    }
721
722    pub fn as_is_not_distinct_from_cond(&self) -> Option<(InputRef, InputRef)> {
723        if let ExprImpl::FunctionCall(function_call) = self
724            && function_call.func_type() == ExprType::IsNotDistinctFrom
725            && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) =
726                function_call.clone().decompose_as_binary()
727        {
728            if x.index() < y.index() {
729                Some((*x, *y))
730            } else {
731                Some((*y, *x))
732            }
733        } else {
734            None
735        }
736    }
737
738    pub fn reverse_comparison(comparison: ExprType) -> ExprType {
739        match comparison {
740            ExprType::LessThan => ExprType::GreaterThan,
741            ExprType::LessThanOrEqual => ExprType::GreaterThanOrEqual,
742            ExprType::GreaterThan => ExprType::LessThan,
743            ExprType::GreaterThanOrEqual => ExprType::LessThanOrEqual,
744            ExprType::Equal | ExprType::IsNotDistinctFrom => comparison,
745            _ => unreachable!(),
746        }
747    }
748
749    pub fn as_comparison_cond(&self) -> Option<(InputRef, ExprType, InputRef)> {
750        if let ExprImpl::FunctionCall(function_call) = self {
751            match function_call.func_type() {
752                ty @ (ExprType::LessThan
753                | ExprType::LessThanOrEqual
754                | ExprType::GreaterThan
755                | ExprType::GreaterThanOrEqual) => {
756                    let (_, op1, op2) = function_call.clone().decompose_as_binary();
757                    if let (ExprImpl::InputRef(x), ExprImpl::InputRef(y)) = (op1, op2) {
758                        if x.index < y.index {
759                            Some((*x, ty, *y))
760                        } else {
761                            Some((*y, Self::reverse_comparison(ty), *x))
762                        }
763                    } else {
764                        None
765                    }
766                }
767                _ => None,
768            }
769        } else {
770            None
771        }
772    }
773
774    /// Accepts expressions of the form `input_expr cmp now_expr` or `now_expr cmp input_expr`,
775    /// where `input_expr` contains an `InputRef` and contains no `now()`, and `now_expr`
776    /// contains a `now()` but no `InputRef`.
777    ///
778    /// Canonicalizes to the first ordering and returns `(input_expr, cmp, now_expr)`
779    pub fn as_now_comparison_cond(&self) -> Option<(ExprImpl, ExprType, ExprImpl)> {
780        if let ExprImpl::FunctionCall(function_call) = self {
781            match function_call.func_type() {
782                ty @ (ExprType::Equal
783                | ExprType::LessThan
784                | ExprType::LessThanOrEqual
785                | ExprType::GreaterThan
786                | ExprType::GreaterThanOrEqual) => {
787                    let (_, op1, op2) = function_call.clone().decompose_as_binary();
788                    if !op1.has_now()
789                        && op1.has_input_ref()
790                        && op2.has_now()
791                        && !op2.has_input_ref()
792                    {
793                        Some((op1, ty, op2))
794                    } else if op1.has_now()
795                        && !op1.has_input_ref()
796                        && !op2.has_now()
797                        && op2.has_input_ref()
798                    {
799                        Some((op2, Self::reverse_comparison(ty), op1))
800                    } else {
801                        None
802                    }
803                }
804                _ => None,
805            }
806        } else {
807            None
808        }
809    }
810
811    /// Returns the `InputRef` and offset of a predicate if it matches
812    /// the form `InputRef [+- const_expr]`, else returns None.
813    ///
814    /// Deprecated: Only used by `as_input_comparison_cond`.
815    #[allow(dead_code)]
816    fn as_input_offset(&self) -> Option<(usize, Option<(ExprType, ExprImpl)>)> {
817        match self {
818            ExprImpl::InputRef(input_ref) => Some((input_ref.index(), None)),
819            ExprImpl::FunctionCall(function_call) => {
820                let expr_type = function_call.func_type();
821                match expr_type {
822                    ExprType::Add | ExprType::Subtract => {
823                        let (_, lhs, rhs) = function_call.clone().decompose_as_binary();
824                        if let ExprImpl::InputRef(input_ref) = &lhs
825                            && rhs.is_const()
826                        {
827                            // Currently we will return `None` for non-literal because the result of the expression might be '1 day'. However, there will definitely exist false positives such as '1 second + 1 second'.
828                            // We will treat the expression as an input offset when rhs is `null`.
829                            if rhs.return_type() == DataType::Interval
830                                && rhs.as_literal().is_none_or(|literal| {
831                                    literal.get_data().as_ref().is_some_and(|scalar| {
832                                        let interval = scalar.as_interval();
833                                        interval.months() != 0 || interval.days() != 0
834                                    })
835                                })
836                            {
837                                None
838                            } else {
839                                Some((input_ref.index(), Some((expr_type, rhs))))
840                            }
841                        } else {
842                            None
843                        }
844                    }
845                    _ => None,
846                }
847            }
848            _ => None,
849        }
850    }
851
852    pub fn as_eq_const(&self) -> Option<(InputRef, ExprImpl)> {
853        if let ExprImpl::FunctionCall(function_call) = self
854            && function_call.func_type() == ExprType::Equal
855        {
856            match function_call.clone().decompose_as_binary() {
857                (_, ExprImpl::InputRef(x), y) if y.is_const() => Some((*x, y)),
858                (_, x, ExprImpl::InputRef(y)) if x.is_const() => Some((*y, x)),
859                _ => None,
860            }
861        } else {
862            None
863        }
864    }
865
866    pub fn as_eq_correlated_input_ref(&self) -> Option<(InputRef, CorrelatedInputRef)> {
867        if let ExprImpl::FunctionCall(function_call) = self
868            && function_call.func_type() == ExprType::Equal
869        {
870            match function_call.clone().decompose_as_binary() {
871                (_, ExprImpl::InputRef(x), ExprImpl::CorrelatedInputRef(y)) => Some((*x, *y)),
872                (_, ExprImpl::CorrelatedInputRef(x), ExprImpl::InputRef(y)) => Some((*y, *x)),
873                _ => None,
874            }
875        } else {
876            None
877        }
878    }
879
880    pub fn as_is_null(&self) -> Option<InputRef> {
881        if let ExprImpl::FunctionCall(function_call) = self
882            && function_call.func_type() == ExprType::IsNull
883        {
884            match function_call.clone().decompose_as_unary() {
885                (_, ExprImpl::InputRef(x)) => Some(*x),
886                _ => None,
887            }
888        } else {
889            None
890        }
891    }
892
893    pub fn as_comparison_const(&self) -> Option<(InputRef, ExprType, ExprImpl)> {
894        fn reverse_comparison(comparison: ExprType) -> ExprType {
895            match comparison {
896                ExprType::LessThan => ExprType::GreaterThan,
897                ExprType::LessThanOrEqual => ExprType::GreaterThanOrEqual,
898                ExprType::GreaterThan => ExprType::LessThan,
899                ExprType::GreaterThanOrEqual => ExprType::LessThanOrEqual,
900                _ => unreachable!(),
901            }
902        }
903
904        if let ExprImpl::FunctionCall(function_call) = self {
905            match function_call.func_type() {
906                ty @ (ExprType::LessThan
907                | ExprType::LessThanOrEqual
908                | ExprType::GreaterThan
909                | ExprType::GreaterThanOrEqual) => {
910                    let (_, op1, op2) = function_call.clone().decompose_as_binary();
911                    match (op1, op2) {
912                        (ExprImpl::InputRef(x), y) if y.is_const() => Some((*x, ty, y)),
913                        (x, ExprImpl::InputRef(y)) if x.is_const() => {
914                            Some((*y, reverse_comparison(ty), x))
915                        }
916                        _ => None,
917                    }
918                }
919                _ => None,
920            }
921        } else {
922            None
923        }
924    }
925
926    pub fn as_in_const_list(&self) -> Option<(InputRef, Vec<ExprImpl>)> {
927        if let ExprImpl::FunctionCall(function_call) = self
928            && function_call.func_type() == ExprType::In
929        {
930            let mut inputs = function_call.inputs().iter().cloned();
931            let input_ref = match inputs.next().unwrap() {
932                ExprImpl::InputRef(i) => *i,
933                _ => return None,
934            };
935            let list: Vec<_> = inputs
936                .inspect(|expr| {
937                    // Non constant IN will be bound to OR
938                    assert!(expr.is_const());
939                })
940                .collect();
941
942            Some((input_ref, list))
943        } else {
944            None
945        }
946    }
947
948    pub fn as_or_disjunctions(&self) -> Option<Vec<ExprImpl>> {
949        if let ExprImpl::FunctionCall(function_call) = self
950            && function_call.func_type() == ExprType::Or
951        {
952            Some(to_disjunctions(self.clone()))
953        } else {
954            None
955        }
956    }
957
958    pub fn to_project_set_select_item_proto(&self) -> ProjectSetSelectItem {
959        use risingwave_pb::expr::project_set_select_item::SelectItem::*;
960
961        ProjectSetSelectItem {
962            select_item: Some(match self {
963                ExprImpl::TableFunction(tf) => TableFunction(tf.to_protobuf()),
964                expr => Expr(expr.to_expr_proto()),
965            }),
966        }
967    }
968
969    /// Serialize the expression. Returns an error if this will result in an impure expression on a
970    /// retract stream, which may lead to inconsistent results.
971    pub fn to_project_set_select_item_proto_checked_pure(
972        &self,
973        retract: bool,
974    ) -> crate::error::Result<ProjectSetSelectItem> {
975        use risingwave_pb::expr::project_set_select_item::SelectItem::*;
976
977        Ok(ProjectSetSelectItem {
978            select_item: Some(match self {
979                ExprImpl::TableFunction(tf) => TableFunction(tf.to_protobuf_checked_pure(retract)?),
980                expr => Expr(expr.to_expr_proto_checked_pure(retract, "SELECT list")?),
981            }),
982        })
983    }
984
985    pub fn from_expr_proto(proto: &ExprNode) -> RwResult<Self> {
986        let rex_node = proto.get_rex_node()?;
987        let ret_type = proto.get_return_type()?.into();
988
989        Ok(match rex_node {
990            RexNode::InputRef(column_index) => Self::InputRef(Box::new(InputRef::from_expr_proto(
991                *column_index as _,
992                ret_type,
993            )?)),
994            RexNode::Constant(_) => Self::Literal(Box::new(Literal::from_expr_proto(proto)?)),
995            RexNode::Udf(udf) => Self::UserDefinedFunction(Box::new(
996                UserDefinedFunction::from_expr_proto(udf, ret_type)?,
997            )),
998            RexNode::FuncCall(function_call) => {
999                Self::FunctionCall(Box::new(FunctionCall::from_expr_proto(
1000                    function_call,
1001                    proto.get_function_type()?, // only interpret if it's a function call
1002                    ret_type,
1003                )?))
1004            }
1005            RexNode::Now(_) => Self::Now(Box::new(Now {})),
1006            RexNode::SecretRef(sr) => Self::SecretRef(Box::new(SecretRef {
1007                secret_id: sr.secret_id.into(),
1008                ref_as: risingwave_pb::secret::secret_ref::RefAsType::try_from(sr.ref_as)
1009                    .unwrap_or(risingwave_pb::secret::secret_ref::RefAsType::Text),
1010                secret_name: format!("<secret:{}>", sr.secret_id),
1011            })),
1012        })
1013    }
1014}
1015
1016impl From<Condition> for ExprImpl {
1017    fn from(c: Condition) -> Self {
1018        ExprImpl::and(c.conjunctions)
1019    }
1020}
1021
1022/// A custom Debug implementation that is more concise and suitable to use with
1023/// [`std::fmt::Formatter::debug_list`] in plan nodes. If the verbose output is preferred, it is
1024/// still available via `{:#?}`.
1025impl std::fmt::Debug for ExprImpl {
1026    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1027        if f.alternate() {
1028            return match self {
1029                Self::InputRef(arg0) => f.debug_tuple("InputRef").field(arg0).finish(),
1030                Self::Literal(arg0) => f.debug_tuple("Literal").field(arg0).finish(),
1031                Self::FunctionCall(arg0) => f.debug_tuple("FunctionCall").field(arg0).finish(),
1032                Self::FunctionCallWithLambda(arg0) => {
1033                    f.debug_tuple("FunctionCallWithLambda").field(arg0).finish()
1034                }
1035                Self::AggCall(arg0) => f.debug_tuple("AggCall").field(arg0).finish(),
1036                Self::Subquery(arg0) => f.debug_tuple("Subquery").field(arg0).finish(),
1037                Self::CorrelatedInputRef(arg0) => {
1038                    f.debug_tuple("CorrelatedInputRef").field(arg0).finish()
1039                }
1040                Self::TableFunction(arg0) => f.debug_tuple("TableFunction").field(arg0).finish(),
1041                Self::WindowFunction(arg0) => f.debug_tuple("WindowFunction").field(arg0).finish(),
1042                Self::UserDefinedFunction(arg0) => {
1043                    f.debug_tuple("UserDefinedFunction").field(arg0).finish()
1044                }
1045                Self::Parameter(arg0) => f.debug_tuple("Parameter").field(arg0).finish(),
1046                Self::Now(_) => f.debug_tuple("Now").finish(),
1047                Self::SecretRef(arg0) => f.debug_tuple("SecretRef").field(arg0).finish(),
1048            };
1049        }
1050        match self {
1051            Self::InputRef(x) => write!(f, "{:?}", x),
1052            Self::Literal(x) => write!(f, "{:?}", x),
1053            Self::FunctionCall(x) => write!(f, "{:?}", x),
1054            Self::FunctionCallWithLambda(x) => write!(f, "{:?}", x),
1055            Self::AggCall(x) => write!(f, "{:?}", x),
1056            Self::Subquery(x) => write!(f, "{:?}", x),
1057            Self::CorrelatedInputRef(x) => write!(f, "{:?}", x),
1058            Self::TableFunction(x) => write!(f, "{:?}", x),
1059            Self::WindowFunction(x) => write!(f, "{:?}", x),
1060            Self::UserDefinedFunction(x) => write!(f, "{:?}", x),
1061            Self::Parameter(x) => write!(f, "{:?}", x),
1062            Self::Now(x) => write!(f, "{:?}", x),
1063            Self::SecretRef(x) => write!(f, "Secret({})", x.secret_name),
1064        }
1065    }
1066}
1067
1068pub struct ExprDisplay<'a> {
1069    pub expr: &'a ExprImpl,
1070    pub input_schema: &'a Schema,
1071}
1072
1073impl std::fmt::Debug for ExprDisplay<'_> {
1074    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1075        let that = self.expr;
1076        match that {
1077            ExprImpl::InputRef(x) => write!(
1078                f,
1079                "{:?}",
1080                InputRefDisplay {
1081                    input_ref: x,
1082                    input_schema: self.input_schema
1083                }
1084            ),
1085            ExprImpl::Literal(x) => write!(f, "{:?}", x),
1086            ExprImpl::FunctionCall(x) => write!(
1087                f,
1088                "{:?}",
1089                FunctionCallDisplay {
1090                    function_call: x,
1091                    input_schema: self.input_schema
1092                }
1093            ),
1094            ExprImpl::FunctionCallWithLambda(x) => write!(
1095                f,
1096                "{:?}",
1097                FunctionCallDisplay {
1098                    function_call: &x.to_full_function_call(),
1099                    input_schema: self.input_schema
1100                }
1101            ),
1102            ExprImpl::AggCall(x) => write!(f, "{:?}", x),
1103            ExprImpl::Subquery(x) => write!(f, "{:?}", x),
1104            ExprImpl::CorrelatedInputRef(x) => write!(f, "{:?}", x),
1105            ExprImpl::TableFunction(x) => {
1106                // TODO: TableFunctionCallVerboseDisplay
1107                write!(f, "{:?}", x)
1108            }
1109            ExprImpl::WindowFunction(x) => {
1110                // TODO: WindowFunctionCallVerboseDisplay
1111                write!(f, "{:?}", x)
1112            }
1113            ExprImpl::UserDefinedFunction(x) => {
1114                write!(
1115                    f,
1116                    "{:?}",
1117                    UserDefinedFunctionDisplay {
1118                        func_call: x,
1119                        input_schema: self.input_schema
1120                    }
1121                )
1122            }
1123            ExprImpl::Parameter(x) => write!(f, "{:?}", x),
1124            ExprImpl::Now(x) => write!(f, "{:?}", x),
1125            ExprImpl::SecretRef(x) => write!(f, "Secret({})", x.secret_name),
1126        }
1127    }
1128}
1129
1130impl std::fmt::Display for ExprDisplay<'_> {
1131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1132        (self as &dyn std::fmt::Debug).fmt(f)
1133    }
1134}
1135
1136#[cfg(test)]
1137/// Asserts that the expression is an [`InputRef`] with the given index.
1138macro_rules! assert_eq_input_ref {
1139    ($e:expr, $index:expr) => {
1140        match $e {
1141            ExprImpl::InputRef(i) => assert_eq!(i.index(), $index),
1142            _ => assert!(false, "Expected input ref, found {:?}", $e),
1143        }
1144    };
1145}
1146
1147#[cfg(test)]
1148pub(crate) use assert_eq_input_ref;
1149use risingwave_common::bail;
1150use risingwave_common::catalog::Schema;
1151use risingwave_common::row::OwnedRow;
1152
1153use crate::utils::Condition;
1154
1155#[cfg(test)]
1156mod tests {
1157    use risingwave_pb::secret::secret_ref::RefAsType;
1158
1159    use super::*;
1160
1161    #[test]
1162    fn test_expr_debug_alternate() {
1163        let mut e = InputRef::new(1, DataType::Boolean).into();
1164        e = FunctionCall::new(ExprType::Not, vec![e]).unwrap().into();
1165        let s = format!("{:#?}", e);
1166        assert!(s.contains("return_type: Boolean"))
1167    }
1168
1169    #[test]
1170    fn test_secret_ref_display_contains_name() {
1171        let expr = ExprImpl::SecretRef(Box::new(SecretRef {
1172            secret_id: 42.into(),
1173            ref_as: RefAsType::Text,
1174            secret_name: "test_secret".to_owned(),
1175        }));
1176
1177        assert_eq!(format!("{expr:?}"), "Secret(test_secret)");
1178        assert_eq!(
1179            format!(
1180                "{:?}",
1181                ExprDisplay {
1182                    expr: &expr,
1183                    input_schema: Schema::empty(),
1184                }
1185            ),
1186            "Secret(test_secret)"
1187        );
1188        assert!(format!("{expr:#?}").contains("test_secret"));
1189    }
1190}