1use enum_as_inner::EnumAsInner;
16use fixedbitset::FixedBitSet;
17use futures::FutureExt;
18use paste::paste;
19use risingwave_common::array::ListValue;
20use risingwave_common::types::{DataType, Datum, JsonbVal, MapType, Scalar, ScalarImpl};
21use risingwave_expr::aggregate::PbAggKind;
22use risingwave_expr::expr::build_from_prost;
23use risingwave_pb::expr::expr_node::RexNode;
24use risingwave_pb::expr::{ExprNode, ProjectSetSelectItem};
25use user_defined_function::UserDefinedFunctionDisplay;
26
27use crate::error::{ErrorCode, Result as RwResult};
28use crate::session::current;
29
30mod agg_call;
31mod correlated_input_ref;
32mod function_call;
33mod function_call_with_lambda;
34mod input_ref;
35mod literal;
36mod now;
37mod parameter;
38mod pure;
39mod subquery;
40mod table_function;
41mod user_defined_function;
42mod window_function;
43
44mod order_by_expr;
45pub use order_by_expr::{OrderBy, OrderByExpr};
46
47mod expr_mutator;
48mod expr_rewriter;
49mod expr_visitor;
50pub mod function_impl;
51mod session_timezone;
52mod type_inference;
53mod utils;
54
55pub use agg_call::AggCall;
56pub use correlated_input_ref::{CorrelatedId, CorrelatedInputRef, Depth, InputRefDepthRewriter};
57pub use expr_mutator::ExprMutator;
58pub use expr_rewriter::{ExprRewriter, default_rewrite_expr};
59pub use expr_visitor::{ExprVisitor, default_visit_expr};
60pub use function_call::{FunctionCall, FunctionCallDisplay, is_row_function};
61pub use function_call_with_lambda::FunctionCallWithLambda;
62pub use input_ref::{InputRef, InputRefDisplay, input_ref_to_column_indices};
63pub use literal::Literal;
64pub use now::{InlineNowProcTime, Now, NowProcTimeFinder};
65pub use parameter::Parameter;
66pub use pure::*;
67pub use risingwave_pb::expr::expr_node::Type as ExprType;
68pub use session_timezone::{SessionTimezone, TimestamptzExprFinder};
69pub use subquery::{Subquery, SubqueryKind};
70pub use table_function::{TableFunction, TableFunctionType};
71pub use type_inference::*;
72pub use user_defined_function::UserDefinedFunction;
73pub use utils::*;
74pub use window_function::WindowFunction;
75
76pub(crate) const EXPR_DEPTH_THRESHOLD: usize = 30;
77pub(crate) const EXPR_TOO_DEEP_NOTICE: &str = "Some expression is too complicated. \
78Consider simplifying or splitting the query if you encounter any issues.";
79
80pub(crate) fn reject_impure(expr: impl Into<ExprImpl>, context: &str) -> RwResult<()> {
81 if let Some(impure_expr_desc) = impure_expr_desc(&expr.into()) {
82 let msg = format!(
83 "using an impure expression ({impure_expr_desc}) in {context} \
84 on a retract stream may lead to inconsistent results"
85 );
86 if current::config()
87 .is_some_and(|c| c.read().streaming_unsafe_allow_unmaterialized_impure_expr())
88 {
89 current::notice_to_user(msg);
90 } else {
91 return Err(ErrorCode::NotSupported(
92 msg,
93 "rewrite the query to extract the impure expression into the select list, \
94 or set `streaming_unsafe_allow_unmaterialized_impure_expr` to allow \
95 the behavior at the risk of inconsistent results or panics during execution"
96 .into(),
97 )
98 .into());
99 }
100 }
101 Ok(())
102}
103
104pub trait Expr: Into<ExprImpl> {
106 fn return_type(&self) -> DataType;
108
109 fn try_to_expr_proto(&self) -> Result<ExprNode, String>;
111
112 fn to_expr_proto(&self) -> ExprNode {
114 self.try_to_expr_proto()
115 .expect("failed to serialize expression to protobuf")
116 }
117
118 fn to_expr_proto_checked_pure(
121 &self,
122 retract: bool,
123 context: &str,
124 ) -> crate::error::Result<ExprNode>
125 where
126 Self: Clone,
127 {
128 if retract {
129 reject_impure(self.clone(), context)?;
130 }
131 self.try_to_expr_proto()
132 .map_err(|e| ErrorCode::InternalError(e).into())
133 }
134}
135
136macro_rules! impl_expr_impl {
137 ($($t:ident,)*) => {
138 #[derive(Clone, Eq, PartialEq, Hash, EnumAsInner)]
139 pub enum ExprImpl {
140 $($t(Box<$t>),)*
141 }
142
143 impl ExprImpl {
144 pub fn variant_name(&self) -> &'static str {
145 match self {
146 $(ExprImpl::$t(_) => stringify!($t),)*
147 }
148 }
149 }
150
151 $(
152 impl From<$t> for ExprImpl {
153 fn from(o: $t) -> ExprImpl {
154 ExprImpl::$t(Box::new(o))
155 }
156 })*
157
158 impl Expr for ExprImpl {
159 fn return_type(&self) -> DataType {
160 match self {
161 $(ExprImpl::$t(expr) => expr.return_type(),)*
162 }
163 }
164
165 fn try_to_expr_proto(&self) -> Result<ExprNode, String> {
166 match self {
167 $(ExprImpl::$t(expr) => expr.try_to_expr_proto(),)*
168 }
169 }
170 }
171 };
172}
173
174impl_expr_impl!(
175 CorrelatedInputRef,
177 InputRef,
178 Literal,
179 FunctionCall,
180 FunctionCallWithLambda,
181 AggCall,
182 Subquery,
183 TableFunction,
184 WindowFunction,
185 UserDefinedFunction,
186 Parameter,
187 Now,
188);
189
190impl ExprImpl {
191 #[inline(always)]
193 pub fn literal_int(v: i32) -> Self {
194 Literal::new(Some(v.to_scalar_value()), DataType::Int32).into()
195 }
196
197 #[inline(always)]
199 pub fn literal_bigint(v: i64) -> Self {
200 Literal::new(Some(v.to_scalar_value()), DataType::Int64).into()
201 }
202
203 #[inline(always)]
205 pub fn literal_f64(v: f64) -> Self {
206 Literal::new(Some(v.into()), DataType::Float64).into()
207 }
208
209 #[inline(always)]
211 pub fn literal_bool(v: bool) -> Self {
212 Literal::new(Some(v.to_scalar_value()), DataType::Boolean).into()
213 }
214
215 #[inline(always)]
217 pub fn literal_varchar(v: String) -> Self {
218 Literal::new(Some(v.into()), DataType::Varchar).into()
219 }
220
221 #[inline(always)]
223 pub fn literal_null(element_type: DataType) -> Self {
224 Literal::new(None, element_type).into()
225 }
226
227 #[inline(always)]
229 pub fn literal_jsonb(v: JsonbVal) -> Self {
230 Literal::new(Some(v.into()), DataType::Jsonb).into()
231 }
232
233 #[inline(always)]
235 pub fn literal_list(v: ListValue, element_type: DataType) -> Self {
236 Literal::new(Some(v.to_scalar_value()), DataType::list(element_type)).into()
237 }
238
239 pub fn take(&mut self) -> Self {
241 std::mem::replace(self, Self::literal_null(self.return_type()))
242 }
243
244 #[inline(always)]
246 pub fn count_star() -> Self {
247 AggCall::new(
248 PbAggKind::Count.into(),
249 vec![],
250 false,
251 OrderBy::any(),
252 Condition::true_cond(),
253 vec![],
254 )
255 .unwrap()
256 .into()
257 }
258
259 pub fn and(exprs: impl IntoIterator<Item = ExprImpl>) -> Self {
263 merge_expr_by_logical(exprs, ExprType::And, ExprImpl::literal_bool(true))
264 }
265
266 pub fn or(exprs: impl IntoIterator<Item = ExprImpl>) -> Self {
270 merge_expr_by_logical(exprs, ExprType::Or, ExprImpl::literal_bool(false))
271 }
272
273 pub fn collect_input_refs(&self, input_col_num: usize) -> FixedBitSet {
278 collect_input_refs(input_col_num, [self])
279 }
280
281 pub fn is_pure(&self) -> bool {
283 is_pure(self)
284 }
285
286 pub fn is_impure(&self) -> bool {
287 is_impure(self)
288 }
289
290 pub fn count_nows(&self) -> usize {
292 let mut visitor = CountNow::default();
293 visitor.visit_expr(self);
294 visitor.count()
295 }
296
297 pub fn is_null(&self) -> bool {
299 matches!(self, ExprImpl::Literal(literal) if literal.get_data().is_none())
300 }
301
302 pub fn is_untyped(&self) -> bool {
304 matches!(self, ExprImpl::Literal(literal) if literal.is_untyped())
305 || matches!(self, ExprImpl::Parameter(parameter) if !parameter.has_infer())
306 }
307
308 pub fn cast_implicit(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
310 FunctionCall::cast_mut(&mut self, target, CastContext::Implicit)?;
311 Ok(self)
312 }
313
314 pub fn cast_assign(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
316 FunctionCall::cast_mut(&mut self, target, CastContext::Assign)?;
317 Ok(self)
318 }
319
320 pub fn cast_explicit(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
322 FunctionCall::cast_mut(&mut self, target, CastContext::Explicit)?;
323 Ok(self)
324 }
325
326 pub fn cast_implicit_mut(&mut self, target: &DataType) -> Result<(), CastError> {
328 FunctionCall::cast_mut(self, target, CastContext::Implicit)
329 }
330
331 pub fn cast_explicit_mut(&mut self, target: &DataType) -> Result<(), CastError> {
333 FunctionCall::cast_mut(self, target, CastContext::Explicit)
334 }
335
336 pub fn cast_to_regclass(self) -> Result<ExprImpl, CastError> {
339 match self.return_type() {
340 DataType::Varchar => Ok(ExprImpl::FunctionCall(Box::new(
341 FunctionCall::new_unchecked(ExprType::CastRegclass, vec![self], DataType::Int32),
342 ))),
343 DataType::Int32 => Ok(self),
344 dt if dt.is_int() => Ok(self.cast_explicit(&DataType::Int32)?),
345 _ => bail_cast_error!("unsupported input type"),
346 }
347 }
348
349 pub fn cast_to_regclass_mut(&mut self) -> Result<(), CastError> {
351 let owned = std::mem::replace(self, ExprImpl::literal_bool(false));
352 *self = owned.cast_to_regclass()?;
353 Ok(())
354 }
355
356 pub fn ensure_array_type(&self) -> Result<(), ErrorCode> {
358 if self.is_untyped() {
359 return Err(ErrorCode::BindError(
360 "could not determine polymorphic type because input has type unknown".into(),
361 ));
362 }
363 match self.return_type() {
364 DataType::List(_) => Ok(()),
365 t => Err(ErrorCode::BindError(format!("expects array but got {t}"))),
366 }
367 }
368
369 pub fn try_into_map_type(&self) -> Result<MapType, ErrorCode> {
371 if self.is_untyped() {
372 return Err(ErrorCode::BindError(
373 "could not determine polymorphic type because input has type unknown".into(),
374 ));
375 }
376 match self.return_type() {
377 DataType::Map(m) => Ok(m),
378 t => Err(ErrorCode::BindError(format!("expects map but got {t}"))),
379 }
380 }
381
382 pub fn enforce_bool_clause(self, clause: &str) -> RwResult<ExprImpl> {
384 if self.is_untyped() {
385 let inner = self.cast_implicit(&DataType::Boolean)?;
386 return Ok(inner);
387 }
388 let return_type = self.return_type();
389 if return_type != DataType::Boolean {
390 bail!(
391 "argument of {} must be boolean, not type {:?}",
392 clause,
393 return_type
394 )
395 }
396 Ok(self)
397 }
398
399 pub fn cast_output(self) -> RwResult<ExprImpl> {
410 if self.return_type() == DataType::Boolean {
411 return Ok(FunctionCall::new(ExprType::BoolOut, vec![self])?.into());
412 }
413 self.cast_assign(&DataType::Varchar)
416 .map_err(|err| err.into())
417 }
418
419 pub async fn eval_row(&self, input: &OwnedRow) -> RwResult<Datum> {
424 let backend_expr = build_from_prost(&self.to_expr_proto())?;
425 Ok(backend_expr.eval_row(input).await?)
426 }
427
428 pub fn try_fold_const(&self) -> Option<RwResult<Datum>> {
435 if self.is_const() {
436 self.eval_row(&OwnedRow::empty())
437 .now_or_never()
438 .expect("constant expression should not be async")
439 .into()
440 } else {
441 None
442 }
443 }
444
445 pub fn fold_const(&self) -> RwResult<Datum> {
447 self.try_fold_const().expect("expression is not constant")
448 }
449}
450
451macro_rules! impl_has_variant {
456 ( $($variant:ty),* ) => {
457 paste! {
458 impl ExprImpl {
459 $(
460 pub fn [<has_ $variant:snake>](&self) -> bool {
461 struct Has { has: bool }
462
463 impl ExprVisitor for Has {
464 fn [<visit_ $variant:snake>](&mut self, _: &$variant) {
465 self.has = true;
466 }
467 }
468
469 let mut visitor = Has { has: false };
470 visitor.visit_expr(self);
471 visitor.has
472 }
473 )*
474 }
475 }
476 };
477}
478
479impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction, UserDefinedFunction, Now}
480
481#[derive(Debug, Clone, PartialEq, Eq, Hash)]
484pub struct InequalityInputPair {
485 pub left_idx: usize,
487 pub right_idx: usize,
489 pub op: ExprType,
491}
492
493impl InequalityInputPair {
494 pub fn new(left_idx: usize, right_idx: usize, op: ExprType) -> Self {
495 debug_assert!(matches!(
496 op,
497 ExprType::LessThan
498 | ExprType::LessThanOrEqual
499 | ExprType::GreaterThan
500 | ExprType::GreaterThanOrEqual
501 ));
502 Self {
503 left_idx,
504 right_idx,
505 op,
506 }
507 }
508
509 pub fn left_side_is_larger(&self) -> bool {
513 matches!(
514 self.op,
515 ExprType::GreaterThan | ExprType::GreaterThanOrEqual
516 )
517 }
518}
519
520impl ExprImpl {
521 pub fn has_correlated_input_ref(&self, _: std::convert::Infallible) -> bool {
532 unreachable!()
533 }
534
535 pub fn has_correlated_input_ref_by_depth(&self, depth: Depth) -> bool {
541 struct Has {
542 depth: usize,
543 has: bool,
544 }
545
546 impl ExprVisitor for Has {
547 fn visit_correlated_input_ref(&mut self, correlated_input_ref: &CorrelatedInputRef) {
548 if correlated_input_ref.depth() == self.depth {
549 self.has = true;
550 }
551 }
552
553 fn visit_subquery(&mut self, subquery: &Subquery) {
554 self.has |= subquery.is_correlated_by_depth(self.depth);
555 }
556 }
557
558 let mut visitor = Has { depth, has: false };
559 visitor.visit_expr(self);
560 visitor.has
561 }
562
563 pub fn has_correlated_input_ref_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
564 struct Has {
565 correlated_id: CorrelatedId,
566 has: bool,
567 }
568
569 impl ExprVisitor for Has {
570 fn visit_correlated_input_ref(&mut self, correlated_input_ref: &CorrelatedInputRef) {
571 if correlated_input_ref.correlated_id() == self.correlated_id {
572 self.has = true;
573 }
574 }
575
576 fn visit_subquery(&mut self, subquery: &Subquery) {
577 self.has |= subquery.is_correlated_by_correlated_id(self.correlated_id);
578 }
579 }
580
581 let mut visitor = Has {
582 correlated_id,
583 has: false,
584 };
585 visitor.visit_expr(self);
586 visitor.has
587 }
588
589 pub fn collect_correlated_indices_by_depth_and_assign_id(
592 &mut self,
593 depth: Depth,
594 correlated_id: CorrelatedId,
595 ) -> Vec<usize> {
596 struct Collector {
597 depth: Depth,
598 correlated_indices: Vec<usize>,
599 correlated_id: CorrelatedId,
600 }
601
602 impl ExprMutator for Collector {
603 fn visit_correlated_input_ref(
604 &mut self,
605 correlated_input_ref: &mut CorrelatedInputRef,
606 ) {
607 if correlated_input_ref.depth() == self.depth {
608 self.correlated_indices.push(correlated_input_ref.index());
609 correlated_input_ref.set_correlated_id(self.correlated_id);
610 }
611 }
612
613 fn visit_subquery(&mut self, subquery: &mut Subquery) {
614 self.correlated_indices.extend(
615 subquery.collect_correlated_indices_by_depth_and_assign_id(
616 self.depth,
617 self.correlated_id,
618 ),
619 );
620 }
621 }
622
623 let mut collector = Collector {
624 depth,
625 correlated_indices: vec![],
626 correlated_id,
627 };
628 collector.visit_expr(self);
629 collector.correlated_indices.sort();
630 collector.correlated_indices.dedup();
631 collector.correlated_indices
632 }
633
634 pub fn only_literal_and_func(&self) -> bool {
635 {
636 struct HasOthers {
637 has_others: bool,
638 }
639
640 impl ExprVisitor for HasOthers {
641 fn visit_expr(&mut self, expr: &ExprImpl) {
642 match expr {
643 ExprImpl::CorrelatedInputRef(_)
644 | ExprImpl::InputRef(_)
645 | ExprImpl::AggCall(_)
646 | ExprImpl::Subquery(_)
647 | ExprImpl::TableFunction(_)
648 | ExprImpl::WindowFunction(_)
649 | ExprImpl::UserDefinedFunction(_)
650 | ExprImpl::Parameter(_)
651 | ExprImpl::Now(_) => self.has_others = true,
652 ExprImpl::Literal(_inner) => {}
653 ExprImpl::FunctionCall(inner) => {
654 if !self.is_short_circuit(inner) {
655 self.visit_function_call(inner)
659 }
660 }
661 ExprImpl::FunctionCallWithLambda(inner) => {
662 self.visit_function_call_with_lambda(inner)
663 }
664 }
665 }
666 }
667
668 impl HasOthers {
669 fn is_short_circuit(&self, func_call: &FunctionCall) -> bool {
670 fn eval_first(e: &ExprImpl, expect: bool) -> bool {
672 if let ExprImpl::Literal(l) = e {
673 *l.get_data() == Some(ScalarImpl::Bool(expect))
674 } else {
675 false
676 }
677 }
678
679 match func_call.func_type {
680 ExprType::Or => eval_first(&func_call.inputs()[0], true),
681 ExprType::And => eval_first(&func_call.inputs()[0], false),
682 _ => false,
683 }
684 }
685 }
686
687 let mut visitor = HasOthers { has_others: false };
688 visitor.visit_expr(self);
689 !visitor.has_others
690 }
691 }
692
693 pub fn is_const(&self) -> bool {
697 self.only_literal_and_func() && self.is_pure()
698 }
699
700 pub fn as_eq_cond(&self) -> Option<(InputRef, InputRef)> {
703 if let ExprImpl::FunctionCall(function_call) = self
704 && function_call.func_type() == ExprType::Equal
705 && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) =
706 function_call.clone().decompose_as_binary()
707 {
708 if x.index() < y.index() {
709 Some((*x, *y))
710 } else {
711 Some((*y, *x))
712 }
713 } else {
714 None
715 }
716 }
717
718 pub fn as_is_not_distinct_from_cond(&self) -> Option<(InputRef, InputRef)> {
719 if let ExprImpl::FunctionCall(function_call) = self
720 && function_call.func_type() == ExprType::IsNotDistinctFrom
721 && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) =
722 function_call.clone().decompose_as_binary()
723 {
724 if x.index() < y.index() {
725 Some((*x, *y))
726 } else {
727 Some((*y, *x))
728 }
729 } else {
730 None
731 }
732 }
733
734 pub fn reverse_comparison(comparison: ExprType) -> ExprType {
735 match comparison {
736 ExprType::LessThan => ExprType::GreaterThan,
737 ExprType::LessThanOrEqual => ExprType::GreaterThanOrEqual,
738 ExprType::GreaterThan => ExprType::LessThan,
739 ExprType::GreaterThanOrEqual => ExprType::LessThanOrEqual,
740 ExprType::Equal | ExprType::IsNotDistinctFrom => comparison,
741 _ => unreachable!(),
742 }
743 }
744
745 pub fn as_comparison_cond(&self) -> Option<(InputRef, ExprType, InputRef)> {
746 if let ExprImpl::FunctionCall(function_call) = self {
747 match function_call.func_type() {
748 ty @ (ExprType::LessThan
749 | ExprType::LessThanOrEqual
750 | ExprType::GreaterThan
751 | ExprType::GreaterThanOrEqual) => {
752 let (_, op1, op2) = function_call.clone().decompose_as_binary();
753 if let (ExprImpl::InputRef(x), ExprImpl::InputRef(y)) = (op1, op2) {
754 if x.index < y.index {
755 Some((*x, ty, *y))
756 } else {
757 Some((*y, Self::reverse_comparison(ty), *x))
758 }
759 } else {
760 None
761 }
762 }
763 _ => None,
764 }
765 } else {
766 None
767 }
768 }
769
770 pub fn as_now_comparison_cond(&self) -> Option<(ExprImpl, ExprType, ExprImpl)> {
776 if let ExprImpl::FunctionCall(function_call) = self {
777 match function_call.func_type() {
778 ty @ (ExprType::Equal
779 | ExprType::LessThan
780 | ExprType::LessThanOrEqual
781 | ExprType::GreaterThan
782 | ExprType::GreaterThanOrEqual) => {
783 let (_, op1, op2) = function_call.clone().decompose_as_binary();
784 if !op1.has_now()
785 && op1.has_input_ref()
786 && op2.has_now()
787 && !op2.has_input_ref()
788 {
789 Some((op1, ty, op2))
790 } else if op1.has_now()
791 && !op1.has_input_ref()
792 && !op2.has_now()
793 && op2.has_input_ref()
794 {
795 Some((op2, Self::reverse_comparison(ty), op1))
796 } else {
797 None
798 }
799 }
800 _ => None,
801 }
802 } else {
803 None
804 }
805 }
806
807 #[allow(dead_code)]
812 fn as_input_offset(&self) -> Option<(usize, Option<(ExprType, ExprImpl)>)> {
813 match self {
814 ExprImpl::InputRef(input_ref) => Some((input_ref.index(), None)),
815 ExprImpl::FunctionCall(function_call) => {
816 let expr_type = function_call.func_type();
817 match expr_type {
818 ExprType::Add | ExprType::Subtract => {
819 let (_, lhs, rhs) = function_call.clone().decompose_as_binary();
820 if let ExprImpl::InputRef(input_ref) = &lhs
821 && rhs.is_const()
822 {
823 if rhs.return_type() == DataType::Interval
826 && rhs.as_literal().is_none_or(|literal| {
827 literal.get_data().as_ref().is_some_and(|scalar| {
828 let interval = scalar.as_interval();
829 interval.months() != 0 || interval.days() != 0
830 })
831 })
832 {
833 None
834 } else {
835 Some((input_ref.index(), Some((expr_type, rhs))))
836 }
837 } else {
838 None
839 }
840 }
841 _ => None,
842 }
843 }
844 _ => None,
845 }
846 }
847
848 pub fn as_eq_const(&self) -> Option<(InputRef, ExprImpl)> {
849 if let ExprImpl::FunctionCall(function_call) = self
850 && function_call.func_type() == ExprType::Equal
851 {
852 match function_call.clone().decompose_as_binary() {
853 (_, ExprImpl::InputRef(x), y) if y.is_const() => Some((*x, y)),
854 (_, x, ExprImpl::InputRef(y)) if x.is_const() => Some((*y, x)),
855 _ => None,
856 }
857 } else {
858 None
859 }
860 }
861
862 pub fn as_eq_correlated_input_ref(&self) -> Option<(InputRef, CorrelatedInputRef)> {
863 if let ExprImpl::FunctionCall(function_call) = self
864 && function_call.func_type() == ExprType::Equal
865 {
866 match function_call.clone().decompose_as_binary() {
867 (_, ExprImpl::InputRef(x), ExprImpl::CorrelatedInputRef(y)) => Some((*x, *y)),
868 (_, ExprImpl::CorrelatedInputRef(x), ExprImpl::InputRef(y)) => Some((*y, *x)),
869 _ => None,
870 }
871 } else {
872 None
873 }
874 }
875
876 pub fn as_is_null(&self) -> Option<InputRef> {
877 if let ExprImpl::FunctionCall(function_call) = self
878 && function_call.func_type() == ExprType::IsNull
879 {
880 match function_call.clone().decompose_as_unary() {
881 (_, ExprImpl::InputRef(x)) => Some(*x),
882 _ => None,
883 }
884 } else {
885 None
886 }
887 }
888
889 pub fn as_comparison_const(&self) -> Option<(InputRef, ExprType, ExprImpl)> {
890 fn reverse_comparison(comparison: ExprType) -> ExprType {
891 match comparison {
892 ExprType::LessThan => ExprType::GreaterThan,
893 ExprType::LessThanOrEqual => ExprType::GreaterThanOrEqual,
894 ExprType::GreaterThan => ExprType::LessThan,
895 ExprType::GreaterThanOrEqual => ExprType::LessThanOrEqual,
896 _ => unreachable!(),
897 }
898 }
899
900 if let ExprImpl::FunctionCall(function_call) = self {
901 match function_call.func_type() {
902 ty @ (ExprType::LessThan
903 | ExprType::LessThanOrEqual
904 | ExprType::GreaterThan
905 | ExprType::GreaterThanOrEqual) => {
906 let (_, op1, op2) = function_call.clone().decompose_as_binary();
907 match (op1, op2) {
908 (ExprImpl::InputRef(x), y) if y.is_const() => Some((*x, ty, y)),
909 (x, ExprImpl::InputRef(y)) if x.is_const() => {
910 Some((*y, reverse_comparison(ty), x))
911 }
912 _ => None,
913 }
914 }
915 _ => None,
916 }
917 } else {
918 None
919 }
920 }
921
922 pub fn as_in_const_list(&self) -> Option<(InputRef, Vec<ExprImpl>)> {
923 if let ExprImpl::FunctionCall(function_call) = self
924 && function_call.func_type() == ExprType::In
925 {
926 let mut inputs = function_call.inputs().iter().cloned();
927 let input_ref = match inputs.next().unwrap() {
928 ExprImpl::InputRef(i) => *i,
929 _ => return None,
930 };
931 let list: Vec<_> = inputs
932 .inspect(|expr| {
933 assert!(expr.is_const());
935 })
936 .collect();
937
938 Some((input_ref, list))
939 } else {
940 None
941 }
942 }
943
944 pub fn as_or_disjunctions(&self) -> Option<Vec<ExprImpl>> {
945 if let ExprImpl::FunctionCall(function_call) = self
946 && function_call.func_type() == ExprType::Or
947 {
948 Some(to_disjunctions(self.clone()))
949 } else {
950 None
951 }
952 }
953
954 pub fn to_project_set_select_item_proto(&self) -> ProjectSetSelectItem {
955 use risingwave_pb::expr::project_set_select_item::SelectItem::*;
956
957 ProjectSetSelectItem {
958 select_item: Some(match self {
959 ExprImpl::TableFunction(tf) => TableFunction(tf.to_protobuf()),
960 expr => Expr(expr.to_expr_proto()),
961 }),
962 }
963 }
964
965 pub fn to_project_set_select_item_proto_checked_pure(
968 &self,
969 retract: bool,
970 ) -> crate::error::Result<ProjectSetSelectItem> {
971 use risingwave_pb::expr::project_set_select_item::SelectItem::*;
972
973 Ok(ProjectSetSelectItem {
974 select_item: Some(match self {
975 ExprImpl::TableFunction(tf) => TableFunction(tf.to_protobuf_checked_pure(retract)?),
976 expr => Expr(expr.to_expr_proto_checked_pure(retract, "SELECT list")?),
977 }),
978 })
979 }
980
981 pub fn from_expr_proto(proto: &ExprNode) -> RwResult<Self> {
982 let rex_node = proto.get_rex_node()?;
983 let ret_type = proto.get_return_type()?.into();
984
985 Ok(match rex_node {
986 RexNode::InputRef(column_index) => Self::InputRef(Box::new(InputRef::from_expr_proto(
987 *column_index as _,
988 ret_type,
989 )?)),
990 RexNode::Constant(_) => Self::Literal(Box::new(Literal::from_expr_proto(proto)?)),
991 RexNode::Udf(udf) => Self::UserDefinedFunction(Box::new(
992 UserDefinedFunction::from_expr_proto(udf, ret_type)?,
993 )),
994 RexNode::FuncCall(function_call) => {
995 Self::FunctionCall(Box::new(FunctionCall::from_expr_proto(
996 function_call,
997 proto.get_function_type()?, ret_type,
999 )?))
1000 }
1001 RexNode::Now(_) => Self::Now(Box::new(Now {})),
1002 })
1003 }
1004}
1005
1006impl From<Condition> for ExprImpl {
1007 fn from(c: Condition) -> Self {
1008 ExprImpl::and(c.conjunctions)
1009 }
1010}
1011
1012impl std::fmt::Debug for ExprImpl {
1016 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1017 if f.alternate() {
1018 return match self {
1019 Self::InputRef(arg0) => f.debug_tuple("InputRef").field(arg0).finish(),
1020 Self::Literal(arg0) => f.debug_tuple("Literal").field(arg0).finish(),
1021 Self::FunctionCall(arg0) => f.debug_tuple("FunctionCall").field(arg0).finish(),
1022 Self::FunctionCallWithLambda(arg0) => {
1023 f.debug_tuple("FunctionCallWithLambda").field(arg0).finish()
1024 }
1025 Self::AggCall(arg0) => f.debug_tuple("AggCall").field(arg0).finish(),
1026 Self::Subquery(arg0) => f.debug_tuple("Subquery").field(arg0).finish(),
1027 Self::CorrelatedInputRef(arg0) => {
1028 f.debug_tuple("CorrelatedInputRef").field(arg0).finish()
1029 }
1030 Self::TableFunction(arg0) => f.debug_tuple("TableFunction").field(arg0).finish(),
1031 Self::WindowFunction(arg0) => f.debug_tuple("WindowFunction").field(arg0).finish(),
1032 Self::UserDefinedFunction(arg0) => {
1033 f.debug_tuple("UserDefinedFunction").field(arg0).finish()
1034 }
1035 Self::Parameter(arg0) => f.debug_tuple("Parameter").field(arg0).finish(),
1036 Self::Now(_) => f.debug_tuple("Now").finish(),
1037 };
1038 }
1039 match self {
1040 Self::InputRef(x) => write!(f, "{:?}", x),
1041 Self::Literal(x) => write!(f, "{:?}", x),
1042 Self::FunctionCall(x) => write!(f, "{:?}", x),
1043 Self::FunctionCallWithLambda(x) => write!(f, "{:?}", x),
1044 Self::AggCall(x) => write!(f, "{:?}", x),
1045 Self::Subquery(x) => write!(f, "{:?}", x),
1046 Self::CorrelatedInputRef(x) => write!(f, "{:?}", x),
1047 Self::TableFunction(x) => write!(f, "{:?}", x),
1048 Self::WindowFunction(x) => write!(f, "{:?}", x),
1049 Self::UserDefinedFunction(x) => write!(f, "{:?}", x),
1050 Self::Parameter(x) => write!(f, "{:?}", x),
1051 Self::Now(x) => write!(f, "{:?}", x),
1052 }
1053 }
1054}
1055
1056pub struct ExprDisplay<'a> {
1057 pub expr: &'a ExprImpl,
1058 pub input_schema: &'a Schema,
1059}
1060
1061impl std::fmt::Debug for ExprDisplay<'_> {
1062 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1063 let that = self.expr;
1064 match that {
1065 ExprImpl::InputRef(x) => write!(
1066 f,
1067 "{:?}",
1068 InputRefDisplay {
1069 input_ref: x,
1070 input_schema: self.input_schema
1071 }
1072 ),
1073 ExprImpl::Literal(x) => write!(f, "{:?}", x),
1074 ExprImpl::FunctionCall(x) => write!(
1075 f,
1076 "{:?}",
1077 FunctionCallDisplay {
1078 function_call: x,
1079 input_schema: self.input_schema
1080 }
1081 ),
1082 ExprImpl::FunctionCallWithLambda(x) => write!(
1083 f,
1084 "{:?}",
1085 FunctionCallDisplay {
1086 function_call: &x.to_full_function_call(),
1087 input_schema: self.input_schema
1088 }
1089 ),
1090 ExprImpl::AggCall(x) => write!(f, "{:?}", x),
1091 ExprImpl::Subquery(x) => write!(f, "{:?}", x),
1092 ExprImpl::CorrelatedInputRef(x) => write!(f, "{:?}", x),
1093 ExprImpl::TableFunction(x) => {
1094 write!(f, "{:?}", x)
1096 }
1097 ExprImpl::WindowFunction(x) => {
1098 write!(f, "{:?}", x)
1100 }
1101 ExprImpl::UserDefinedFunction(x) => {
1102 write!(
1103 f,
1104 "{:?}",
1105 UserDefinedFunctionDisplay {
1106 func_call: x,
1107 input_schema: self.input_schema
1108 }
1109 )
1110 }
1111 ExprImpl::Parameter(x) => write!(f, "{:?}", x),
1112 ExprImpl::Now(x) => write!(f, "{:?}", x),
1113 }
1114 }
1115}
1116
1117impl std::fmt::Display for ExprDisplay<'_> {
1118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1119 (self as &dyn std::fmt::Debug).fmt(f)
1120 }
1121}
1122
1123#[cfg(test)]
1124macro_rules! assert_eq_input_ref {
1126 ($e:expr, $index:expr) => {
1127 match $e {
1128 ExprImpl::InputRef(i) => assert_eq!(i.index(), $index),
1129 _ => assert!(false, "Expected input ref, found {:?}", $e),
1130 }
1131 };
1132}
1133
1134#[cfg(test)]
1135pub(crate) use assert_eq_input_ref;
1136use risingwave_common::bail;
1137use risingwave_common::catalog::Schema;
1138use risingwave_common::row::OwnedRow;
1139
1140use crate::utils::Condition;
1141
1142#[cfg(test)]
1143mod tests {
1144 use super::*;
1145
1146 #[test]
1147 fn test_expr_debug_alternate() {
1148 let mut e = InputRef::new(1, DataType::Boolean).into();
1149 e = FunctionCall::new(ExprType::Not, vec![e]).unwrap().into();
1150 let s = format!("{:#?}", e);
1151 assert!(s.contains("return_type: Boolean"))
1152 }
1153}