1use enum_as_inner::EnumAsInner;
16use fixedbitset::FixedBitSet;
17use futures::FutureExt;
18use paste::paste;
19use risingwave_common::array::ListValue;
20use risingwave_common::types::{DataType, Datum, JsonbVal, MapType, Scalar, ScalarImpl};
21use risingwave_expr::aggregate::PbAggKind;
22use risingwave_expr::expr::build_from_prost;
23use risingwave_pb::expr::expr_node::RexNode;
24use risingwave_pb::expr::{ExprNode, ProjectSetSelectItem};
25use user_defined_function::UserDefinedFunctionDisplay;
26
27use crate::error::{ErrorCode, Result as RwResult};
28use crate::session::current;
29
30mod agg_call;
31mod correlated_input_ref;
32mod function_call;
33mod function_call_with_lambda;
34mod input_ref;
35mod literal;
36mod now;
37mod parameter;
38mod pure;
39mod subquery;
40mod table_function;
41mod user_defined_function;
42mod window_function;
43
44mod order_by_expr;
45pub use order_by_expr::{OrderBy, OrderByExpr};
46
47mod expr_mutator;
48mod expr_rewriter;
49mod expr_visitor;
50pub mod function_impl;
51mod secret_ref;
52mod session_timezone;
53mod type_inference;
54mod utils;
55
56pub use agg_call::AggCall;
57pub use correlated_input_ref::{CorrelatedId, CorrelatedInputRef, Depth, InputRefDepthRewriter};
58pub use expr_mutator::ExprMutator;
59pub use expr_rewriter::{ExprRewriter, default_rewrite_expr};
60pub use expr_visitor::{ExprVisitor, default_visit_expr};
61pub use function_call::{FunctionCall, FunctionCallDisplay, is_row_function};
62pub use function_call_with_lambda::FunctionCallWithLambda;
63pub use input_ref::{InputRef, InputRefDisplay, input_ref_to_column_indices};
64pub use literal::Literal;
65pub use now::{InlineNowProcTime, Now, NowProcTimeFinder};
66pub use parameter::Parameter;
67pub use pure::*;
68pub use risingwave_pb::expr::expr_node::Type as ExprType;
69pub use secret_ref::SecretRef;
70pub use session_timezone::{SessionTimezone, TimestamptzExprFinder};
71pub use subquery::{Subquery, SubqueryKind};
72pub use table_function::{TableFunction, TableFunctionType};
73pub use type_inference::*;
74pub use user_defined_function::UserDefinedFunction;
75pub use utils::*;
76pub use window_function::WindowFunction;
77
78pub(crate) const EXPR_DEPTH_THRESHOLD: usize = 30;
79pub(crate) const EXPR_TOO_DEEP_NOTICE: &str = "Some expression is too complicated. \
80Consider simplifying or splitting the query if you encounter any issues.";
81
82pub(crate) fn reject_impure(expr: impl Into<ExprImpl>, context: &str) -> RwResult<()> {
83 if let Some(impure_expr_desc) = impure_expr_desc(&expr.into()) {
84 let msg = format!(
85 "using an impure expression ({impure_expr_desc}) in {context} \
86 on a retract stream may lead to inconsistent results"
87 );
88 if current::config()
89 .is_some_and(|c| c.read().streaming_unsafe_allow_unmaterialized_impure_expr())
90 {
91 current::notice_to_user(msg);
92 } else {
93 return Err(ErrorCode::NotSupported(
94 msg,
95 "rewrite the query to extract the impure expression into the select list, \
96 or set `streaming_unsafe_allow_unmaterialized_impure_expr` to allow \
97 the behavior at the risk of inconsistent results or panics during execution"
98 .into(),
99 )
100 .into());
101 }
102 }
103 Ok(())
104}
105
106pub trait Expr: Into<ExprImpl> {
108 fn return_type(&self) -> DataType;
110
111 fn try_to_expr_proto(&self) -> Result<ExprNode, String>;
113
114 fn to_expr_proto(&self) -> ExprNode {
116 self.try_to_expr_proto()
117 .expect("failed to serialize expression to protobuf")
118 }
119
120 fn to_expr_proto_checked_pure(
123 &self,
124 retract: bool,
125 context: &str,
126 ) -> crate::error::Result<ExprNode>
127 where
128 Self: Clone,
129 {
130 if retract {
131 reject_impure(self.clone(), context)?;
132 }
133 self.try_to_expr_proto()
134 .map_err(|e| ErrorCode::InternalError(e).into())
135 }
136}
137
138macro_rules! impl_expr_impl {
139 ($($t:ident,)*) => {
140 #[derive(Clone, Eq, PartialEq, Hash, EnumAsInner)]
141 pub enum ExprImpl {
142 $($t(Box<$t>),)*
143 }
144
145 impl ExprImpl {
146 pub fn variant_name(&self) -> &'static str {
147 match self {
148 $(ExprImpl::$t(_) => stringify!($t),)*
149 }
150 }
151 }
152
153 $(
154 impl From<$t> for ExprImpl {
155 fn from(o: $t) -> ExprImpl {
156 ExprImpl::$t(Box::new(o))
157 }
158 })*
159
160 impl Expr for ExprImpl {
161 fn return_type(&self) -> DataType {
162 match self {
163 $(ExprImpl::$t(expr) => expr.return_type(),)*
164 }
165 }
166
167 fn try_to_expr_proto(&self) -> Result<ExprNode, String> {
168 match self {
169 $(ExprImpl::$t(expr) => expr.try_to_expr_proto(),)*
170 }
171 }
172 }
173 };
174}
175
176impl_expr_impl!(
177 CorrelatedInputRef,
179 InputRef,
180 Literal,
181 FunctionCall,
182 FunctionCallWithLambda,
183 AggCall,
184 Subquery,
185 TableFunction,
186 WindowFunction,
187 UserDefinedFunction,
188 Parameter,
189 Now,
190 SecretRef,
191);
192
193impl ExprImpl {
194 #[inline(always)]
196 pub fn literal_int(v: i32) -> Self {
197 Literal::new(Some(v.to_scalar_value()), DataType::Int32).into()
198 }
199
200 #[inline(always)]
202 pub fn literal_bigint(v: i64) -> Self {
203 Literal::new(Some(v.to_scalar_value()), DataType::Int64).into()
204 }
205
206 #[inline(always)]
208 pub fn literal_f64(v: f64) -> Self {
209 Literal::new(Some(v.into()), DataType::Float64).into()
210 }
211
212 #[inline(always)]
214 pub fn literal_bool(v: bool) -> Self {
215 Literal::new(Some(v.to_scalar_value()), DataType::Boolean).into()
216 }
217
218 #[inline(always)]
220 pub fn literal_varchar(v: String) -> Self {
221 Literal::new(Some(v.into()), DataType::Varchar).into()
222 }
223
224 #[inline(always)]
226 pub fn literal_null(element_type: DataType) -> Self {
227 Literal::new(None, element_type).into()
228 }
229
230 #[inline(always)]
232 pub fn literal_jsonb(v: JsonbVal) -> Self {
233 Literal::new(Some(v.into()), DataType::Jsonb).into()
234 }
235
236 #[inline(always)]
238 pub fn literal_list(v: ListValue, element_type: DataType) -> Self {
239 Literal::new(Some(v.to_scalar_value()), DataType::list(element_type)).into()
240 }
241
242 pub fn take(&mut self) -> Self {
244 std::mem::replace(self, Self::literal_null(self.return_type()))
245 }
246
247 #[inline(always)]
249 pub fn count_star() -> Self {
250 AggCall::new(
251 PbAggKind::Count.into(),
252 vec![],
253 false,
254 OrderBy::any(),
255 Condition::true_cond(),
256 vec![],
257 )
258 .unwrap()
259 .into()
260 }
261
262 pub fn and(exprs: impl IntoIterator<Item = ExprImpl>) -> Self {
266 merge_expr_by_logical(exprs, ExprType::And, ExprImpl::literal_bool(true))
267 }
268
269 pub fn or(exprs: impl IntoIterator<Item = ExprImpl>) -> Self {
273 merge_expr_by_logical(exprs, ExprType::Or, ExprImpl::literal_bool(false))
274 }
275
276 pub fn collect_input_refs(&self, input_col_num: usize) -> FixedBitSet {
281 collect_input_refs(input_col_num, [self])
282 }
283
284 pub fn is_pure(&self) -> bool {
286 is_pure(self)
287 }
288
289 pub fn is_impure(&self) -> bool {
290 is_impure(self)
291 }
292
293 pub fn count_nows(&self) -> usize {
295 let mut visitor = CountNow::default();
296 visitor.visit_expr(self);
297 visitor.count()
298 }
299
300 pub fn is_null(&self) -> bool {
302 matches!(self, ExprImpl::Literal(literal) if literal.get_data().is_none())
303 }
304
305 pub fn is_untyped(&self) -> bool {
307 matches!(self, ExprImpl::Literal(literal) if literal.is_untyped())
308 || matches!(self, ExprImpl::Parameter(parameter) if !parameter.has_infer())
309 }
310
311 pub fn cast_implicit(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
313 FunctionCall::cast_mut(&mut self, target, CastContext::Implicit)?;
314 Ok(self)
315 }
316
317 pub fn cast_assign(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
319 FunctionCall::cast_mut(&mut self, target, CastContext::Assign)?;
320 Ok(self)
321 }
322
323 pub fn cast_explicit(mut self, target: &DataType) -> Result<ExprImpl, CastError> {
325 FunctionCall::cast_mut(&mut self, target, CastContext::Explicit)?;
326 Ok(self)
327 }
328
329 pub fn cast_implicit_mut(&mut self, target: &DataType) -> Result<(), CastError> {
331 FunctionCall::cast_mut(self, target, CastContext::Implicit)
332 }
333
334 pub fn cast_explicit_mut(&mut self, target: &DataType) -> Result<(), CastError> {
336 FunctionCall::cast_mut(self, target, CastContext::Explicit)
337 }
338
339 pub fn cast_to_regclass(self) -> Result<ExprImpl, CastError> {
342 match self.return_type() {
343 DataType::Varchar => Ok(ExprImpl::FunctionCall(Box::new(
344 FunctionCall::new_unchecked(ExprType::CastRegclass, vec![self], DataType::Int32),
345 ))),
346 DataType::Int32 => Ok(self),
347 dt if dt.is_int() => Ok(self.cast_explicit(&DataType::Int32)?),
348 _ => bail_cast_error!("unsupported input type"),
349 }
350 }
351
352 pub fn cast_to_regclass_mut(&mut self) -> Result<(), CastError> {
354 let owned = std::mem::replace(self, ExprImpl::literal_bool(false));
355 *self = owned.cast_to_regclass()?;
356 Ok(())
357 }
358
359 pub fn ensure_array_type(&self) -> Result<(), ErrorCode> {
361 if self.is_untyped() {
362 return Err(ErrorCode::BindError(
363 "could not determine polymorphic type because input has type unknown".into(),
364 ));
365 }
366 match self.return_type() {
367 DataType::List(_) => Ok(()),
368 t => Err(ErrorCode::BindError(format!("expects array but got {t}"))),
369 }
370 }
371
372 pub fn try_into_map_type(&self) -> Result<MapType, ErrorCode> {
374 if self.is_untyped() {
375 return Err(ErrorCode::BindError(
376 "could not determine polymorphic type because input has type unknown".into(),
377 ));
378 }
379 match self.return_type() {
380 DataType::Map(m) => Ok(m),
381 t => Err(ErrorCode::BindError(format!("expects map but got {t}"))),
382 }
383 }
384
385 pub fn enforce_bool_clause(self, clause: &str) -> RwResult<ExprImpl> {
387 if self.is_untyped() {
388 let inner = self.cast_implicit(&DataType::Boolean)?;
389 return Ok(inner);
390 }
391 let return_type = self.return_type();
392 if return_type != DataType::Boolean {
393 bail!(
394 "argument of {} must be boolean, not type {:?}",
395 clause,
396 return_type
397 )
398 }
399 Ok(self)
400 }
401
402 pub fn cast_output(self) -> RwResult<ExprImpl> {
413 if self.return_type() == DataType::Boolean {
414 return Ok(FunctionCall::new(ExprType::BoolOut, vec![self])?.into());
415 }
416 self.cast_assign(&DataType::Varchar)
419 .map_err(|err| err.into())
420 }
421
422 pub async fn eval_row(&self, input: &OwnedRow) -> RwResult<Datum> {
427 let backend_expr = build_from_prost(&self.to_expr_proto())?;
428 Ok(backend_expr.eval_row(input).await?)
429 }
430
431 pub fn try_fold_const(&self) -> Option<RwResult<Datum>> {
438 if self.is_const() {
439 self.eval_row(&OwnedRow::empty())
440 .now_or_never()
441 .expect("constant expression should not be async")
442 .into()
443 } else {
444 None
445 }
446 }
447
448 pub fn fold_const(&self) -> RwResult<Datum> {
450 self.try_fold_const().expect("expression is not constant")
451 }
452}
453
454macro_rules! impl_has_variant {
459 ( $($variant:ty),* ) => {
460 paste! {
461 impl ExprImpl {
462 $(
463 pub fn [<has_ $variant:snake>](&self) -> bool {
464 struct Has { has: bool }
465
466 impl ExprVisitor for Has {
467 fn [<visit_ $variant:snake>](&mut self, _: &$variant) {
468 self.has = true;
469 }
470 }
471
472 let mut visitor = Has { has: false };
473 visitor.visit_expr(self);
474 visitor.has
475 }
476 )*
477 }
478 }
479 };
480}
481
482impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction, UserDefinedFunction, Now}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
487pub struct InequalityInputPair {
488 pub left_idx: usize,
490 pub right_idx: usize,
492 pub op: ExprType,
494}
495
496impl InequalityInputPair {
497 pub fn new(left_idx: usize, right_idx: usize, op: ExprType) -> Self {
498 debug_assert!(matches!(
499 op,
500 ExprType::LessThan
501 | ExprType::LessThanOrEqual
502 | ExprType::GreaterThan
503 | ExprType::GreaterThanOrEqual
504 ));
505 Self {
506 left_idx,
507 right_idx,
508 op,
509 }
510 }
511
512 pub fn left_side_is_larger(&self) -> bool {
516 matches!(
517 self.op,
518 ExprType::GreaterThan | ExprType::GreaterThanOrEqual
519 )
520 }
521}
522
523impl ExprImpl {
524 pub fn has_correlated_input_ref(&self, _: std::convert::Infallible) -> bool {
535 unreachable!()
536 }
537
538 pub fn has_correlated_input_ref_by_depth(&self, depth: Depth) -> bool {
544 struct Has {
545 depth: usize,
546 has: bool,
547 }
548
549 impl ExprVisitor for Has {
550 fn visit_correlated_input_ref(&mut self, correlated_input_ref: &CorrelatedInputRef) {
551 if correlated_input_ref.depth() == self.depth {
552 self.has = true;
553 }
554 }
555
556 fn visit_subquery(&mut self, subquery: &Subquery) {
557 self.has |= subquery.is_correlated_by_depth(self.depth);
558 }
559 }
560
561 let mut visitor = Has { depth, has: false };
562 visitor.visit_expr(self);
563 visitor.has
564 }
565
566 pub fn has_correlated_input_ref_by_correlated_id(&self, correlated_id: CorrelatedId) -> bool {
567 struct Has {
568 correlated_id: CorrelatedId,
569 has: bool,
570 }
571
572 impl ExprVisitor for Has {
573 fn visit_correlated_input_ref(&mut self, correlated_input_ref: &CorrelatedInputRef) {
574 if correlated_input_ref.correlated_id() == self.correlated_id {
575 self.has = true;
576 }
577 }
578
579 fn visit_subquery(&mut self, subquery: &Subquery) {
580 self.has |= subquery.is_correlated_by_correlated_id(self.correlated_id);
581 }
582 }
583
584 let mut visitor = Has {
585 correlated_id,
586 has: false,
587 };
588 visitor.visit_expr(self);
589 visitor.has
590 }
591
592 pub fn collect_correlated_indices_by_depth_and_assign_id(
595 &mut self,
596 depth: Depth,
597 correlated_id: CorrelatedId,
598 ) -> Vec<usize> {
599 struct Collector {
600 depth: Depth,
601 correlated_indices: Vec<usize>,
602 correlated_id: CorrelatedId,
603 }
604
605 impl ExprMutator for Collector {
606 fn visit_correlated_input_ref(
607 &mut self,
608 correlated_input_ref: &mut CorrelatedInputRef,
609 ) {
610 if correlated_input_ref.depth() == self.depth {
611 self.correlated_indices.push(correlated_input_ref.index());
612 correlated_input_ref.set_correlated_id(self.correlated_id);
613 }
614 }
615
616 fn visit_subquery(&mut self, subquery: &mut Subquery) {
617 self.correlated_indices.extend(
618 subquery.collect_correlated_indices_by_depth_and_assign_id(
619 self.depth,
620 self.correlated_id,
621 ),
622 );
623 }
624 }
625
626 let mut collector = Collector {
627 depth,
628 correlated_indices: vec![],
629 correlated_id,
630 };
631 collector.visit_expr(self);
632 collector.correlated_indices.sort();
633 collector.correlated_indices.dedup();
634 collector.correlated_indices
635 }
636
637 pub fn only_literal_and_func(&self) -> bool {
638 {
639 struct HasOthers {
640 has_others: bool,
641 }
642
643 impl ExprVisitor for HasOthers {
644 fn visit_expr(&mut self, expr: &ExprImpl) {
645 match expr {
646 ExprImpl::CorrelatedInputRef(_)
647 | ExprImpl::InputRef(_)
648 | ExprImpl::AggCall(_)
649 | ExprImpl::Subquery(_)
650 | ExprImpl::TableFunction(_)
651 | ExprImpl::WindowFunction(_)
652 | ExprImpl::UserDefinedFunction(_)
653 | ExprImpl::Parameter(_)
654 | ExprImpl::Now(_)
655 | ExprImpl::SecretRef(_) => self.has_others = true,
656 ExprImpl::Literal(_inner) => {}
657 ExprImpl::FunctionCall(inner) => {
658 if !self.is_short_circuit(inner) {
659 self.visit_function_call(inner)
663 }
664 }
665 ExprImpl::FunctionCallWithLambda(inner) => {
666 self.visit_function_call_with_lambda(inner)
667 }
668 }
669 }
670 }
671
672 impl HasOthers {
673 fn is_short_circuit(&self, func_call: &FunctionCall) -> bool {
674 fn eval_first(e: &ExprImpl, expect: bool) -> bool {
676 if let ExprImpl::Literal(l) = e {
677 *l.get_data() == Some(ScalarImpl::Bool(expect))
678 } else {
679 false
680 }
681 }
682
683 match func_call.func_type {
684 ExprType::Or => eval_first(&func_call.inputs()[0], true),
685 ExprType::And => eval_first(&func_call.inputs()[0], false),
686 _ => false,
687 }
688 }
689 }
690
691 let mut visitor = HasOthers { has_others: false };
692 visitor.visit_expr(self);
693 !visitor.has_others
694 }
695 }
696
697 pub fn is_const(&self) -> bool {
701 self.only_literal_and_func() && self.is_pure()
702 }
703
704 pub fn as_eq_cond(&self) -> Option<(InputRef, InputRef)> {
707 if let ExprImpl::FunctionCall(function_call) = self
708 && function_call.func_type() == ExprType::Equal
709 && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) =
710 function_call.clone().decompose_as_binary()
711 {
712 if x.index() < y.index() {
713 Some((*x, *y))
714 } else {
715 Some((*y, *x))
716 }
717 } else {
718 None
719 }
720 }
721
722 pub fn as_is_not_distinct_from_cond(&self) -> Option<(InputRef, InputRef)> {
723 if let ExprImpl::FunctionCall(function_call) = self
724 && function_call.func_type() == ExprType::IsNotDistinctFrom
725 && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) =
726 function_call.clone().decompose_as_binary()
727 {
728 if x.index() < y.index() {
729 Some((*x, *y))
730 } else {
731 Some((*y, *x))
732 }
733 } else {
734 None
735 }
736 }
737
738 pub fn reverse_comparison(comparison: ExprType) -> ExprType {
739 match comparison {
740 ExprType::LessThan => ExprType::GreaterThan,
741 ExprType::LessThanOrEqual => ExprType::GreaterThanOrEqual,
742 ExprType::GreaterThan => ExprType::LessThan,
743 ExprType::GreaterThanOrEqual => ExprType::LessThanOrEqual,
744 ExprType::Equal | ExprType::IsNotDistinctFrom => comparison,
745 _ => unreachable!(),
746 }
747 }
748
749 pub fn as_comparison_cond(&self) -> Option<(InputRef, ExprType, InputRef)> {
750 if let ExprImpl::FunctionCall(function_call) = self {
751 match function_call.func_type() {
752 ty @ (ExprType::LessThan
753 | ExprType::LessThanOrEqual
754 | ExprType::GreaterThan
755 | ExprType::GreaterThanOrEqual) => {
756 let (_, op1, op2) = function_call.clone().decompose_as_binary();
757 if let (ExprImpl::InputRef(x), ExprImpl::InputRef(y)) = (op1, op2) {
758 if x.index < y.index {
759 Some((*x, ty, *y))
760 } else {
761 Some((*y, Self::reverse_comparison(ty), *x))
762 }
763 } else {
764 None
765 }
766 }
767 _ => None,
768 }
769 } else {
770 None
771 }
772 }
773
774 pub fn as_now_comparison_cond(&self) -> Option<(ExprImpl, ExprType, ExprImpl)> {
780 if let ExprImpl::FunctionCall(function_call) = self {
781 match function_call.func_type() {
782 ty @ (ExprType::Equal
783 | ExprType::LessThan
784 | ExprType::LessThanOrEqual
785 | ExprType::GreaterThan
786 | ExprType::GreaterThanOrEqual) => {
787 let (_, op1, op2) = function_call.clone().decompose_as_binary();
788 if !op1.has_now()
789 && op1.has_input_ref()
790 && op2.has_now()
791 && !op2.has_input_ref()
792 {
793 Some((op1, ty, op2))
794 } else if op1.has_now()
795 && !op1.has_input_ref()
796 && !op2.has_now()
797 && op2.has_input_ref()
798 {
799 Some((op2, Self::reverse_comparison(ty), op1))
800 } else {
801 None
802 }
803 }
804 _ => None,
805 }
806 } else {
807 None
808 }
809 }
810
811 #[allow(dead_code)]
816 fn as_input_offset(&self) -> Option<(usize, Option<(ExprType, ExprImpl)>)> {
817 match self {
818 ExprImpl::InputRef(input_ref) => Some((input_ref.index(), None)),
819 ExprImpl::FunctionCall(function_call) => {
820 let expr_type = function_call.func_type();
821 match expr_type {
822 ExprType::Add | ExprType::Subtract => {
823 let (_, lhs, rhs) = function_call.clone().decompose_as_binary();
824 if let ExprImpl::InputRef(input_ref) = &lhs
825 && rhs.is_const()
826 {
827 if rhs.return_type() == DataType::Interval
830 && rhs.as_literal().is_none_or(|literal| {
831 literal.get_data().as_ref().is_some_and(|scalar| {
832 let interval = scalar.as_interval();
833 interval.months() != 0 || interval.days() != 0
834 })
835 })
836 {
837 None
838 } else {
839 Some((input_ref.index(), Some((expr_type, rhs))))
840 }
841 } else {
842 None
843 }
844 }
845 _ => None,
846 }
847 }
848 _ => None,
849 }
850 }
851
852 pub fn as_eq_const(&self) -> Option<(InputRef, ExprImpl)> {
853 if let ExprImpl::FunctionCall(function_call) = self
854 && function_call.func_type() == ExprType::Equal
855 {
856 match function_call.clone().decompose_as_binary() {
857 (_, ExprImpl::InputRef(x), y) if y.is_const() => Some((*x, y)),
858 (_, x, ExprImpl::InputRef(y)) if x.is_const() => Some((*y, x)),
859 _ => None,
860 }
861 } else {
862 None
863 }
864 }
865
866 pub fn as_eq_correlated_input_ref(&self) -> Option<(InputRef, CorrelatedInputRef)> {
867 if let ExprImpl::FunctionCall(function_call) = self
868 && function_call.func_type() == ExprType::Equal
869 {
870 match function_call.clone().decompose_as_binary() {
871 (_, ExprImpl::InputRef(x), ExprImpl::CorrelatedInputRef(y)) => Some((*x, *y)),
872 (_, ExprImpl::CorrelatedInputRef(x), ExprImpl::InputRef(y)) => Some((*y, *x)),
873 _ => None,
874 }
875 } else {
876 None
877 }
878 }
879
880 pub fn as_is_null(&self) -> Option<InputRef> {
881 if let ExprImpl::FunctionCall(function_call) = self
882 && function_call.func_type() == ExprType::IsNull
883 {
884 match function_call.clone().decompose_as_unary() {
885 (_, ExprImpl::InputRef(x)) => Some(*x),
886 _ => None,
887 }
888 } else {
889 None
890 }
891 }
892
893 pub fn as_comparison_const(&self) -> Option<(InputRef, ExprType, ExprImpl)> {
894 fn reverse_comparison(comparison: ExprType) -> ExprType {
895 match comparison {
896 ExprType::LessThan => ExprType::GreaterThan,
897 ExprType::LessThanOrEqual => ExprType::GreaterThanOrEqual,
898 ExprType::GreaterThan => ExprType::LessThan,
899 ExprType::GreaterThanOrEqual => ExprType::LessThanOrEqual,
900 _ => unreachable!(),
901 }
902 }
903
904 if let ExprImpl::FunctionCall(function_call) = self {
905 match function_call.func_type() {
906 ty @ (ExprType::LessThan
907 | ExprType::LessThanOrEqual
908 | ExprType::GreaterThan
909 | ExprType::GreaterThanOrEqual) => {
910 let (_, op1, op2) = function_call.clone().decompose_as_binary();
911 match (op1, op2) {
912 (ExprImpl::InputRef(x), y) if y.is_const() => Some((*x, ty, y)),
913 (x, ExprImpl::InputRef(y)) if x.is_const() => {
914 Some((*y, reverse_comparison(ty), x))
915 }
916 _ => None,
917 }
918 }
919 _ => None,
920 }
921 } else {
922 None
923 }
924 }
925
926 pub fn as_in_const_list(&self) -> Option<(InputRef, Vec<ExprImpl>)> {
927 if let ExprImpl::FunctionCall(function_call) = self
928 && function_call.func_type() == ExprType::In
929 {
930 let mut inputs = function_call.inputs().iter().cloned();
931 let input_ref = match inputs.next().unwrap() {
932 ExprImpl::InputRef(i) => *i,
933 _ => return None,
934 };
935 let list: Vec<_> = inputs
936 .inspect(|expr| {
937 assert!(expr.is_const());
939 })
940 .collect();
941
942 Some((input_ref, list))
943 } else {
944 None
945 }
946 }
947
948 pub fn as_or_disjunctions(&self) -> Option<Vec<ExprImpl>> {
949 if let ExprImpl::FunctionCall(function_call) = self
950 && function_call.func_type() == ExprType::Or
951 {
952 Some(to_disjunctions(self.clone()))
953 } else {
954 None
955 }
956 }
957
958 pub fn to_project_set_select_item_proto(&self) -> ProjectSetSelectItem {
959 use risingwave_pb::expr::project_set_select_item::SelectItem::*;
960
961 ProjectSetSelectItem {
962 select_item: Some(match self {
963 ExprImpl::TableFunction(tf) => TableFunction(tf.to_protobuf()),
964 expr => Expr(expr.to_expr_proto()),
965 }),
966 }
967 }
968
969 pub fn to_project_set_select_item_proto_checked_pure(
972 &self,
973 retract: bool,
974 ) -> crate::error::Result<ProjectSetSelectItem> {
975 use risingwave_pb::expr::project_set_select_item::SelectItem::*;
976
977 Ok(ProjectSetSelectItem {
978 select_item: Some(match self {
979 ExprImpl::TableFunction(tf) => TableFunction(tf.to_protobuf_checked_pure(retract)?),
980 expr => Expr(expr.to_expr_proto_checked_pure(retract, "SELECT list")?),
981 }),
982 })
983 }
984
985 pub fn from_expr_proto(proto: &ExprNode) -> RwResult<Self> {
986 let rex_node = proto.get_rex_node()?;
987 let ret_type = proto.get_return_type()?.into();
988
989 Ok(match rex_node {
990 RexNode::InputRef(column_index) => Self::InputRef(Box::new(InputRef::from_expr_proto(
991 *column_index as _,
992 ret_type,
993 )?)),
994 RexNode::Constant(_) => Self::Literal(Box::new(Literal::from_expr_proto(proto)?)),
995 RexNode::Udf(udf) => Self::UserDefinedFunction(Box::new(
996 UserDefinedFunction::from_expr_proto(udf, ret_type)?,
997 )),
998 RexNode::FuncCall(function_call) => {
999 Self::FunctionCall(Box::new(FunctionCall::from_expr_proto(
1000 function_call,
1001 proto.get_function_type()?, ret_type,
1003 )?))
1004 }
1005 RexNode::Now(_) => Self::Now(Box::new(Now {})),
1006 RexNode::SecretRef(sr) => Self::SecretRef(Box::new(SecretRef {
1007 secret_id: sr.secret_id.into(),
1008 ref_as: risingwave_pb::secret::secret_ref::RefAsType::try_from(sr.ref_as)
1009 .unwrap_or(risingwave_pb::secret::secret_ref::RefAsType::Text),
1010 secret_name: format!("<secret:{}>", sr.secret_id),
1011 })),
1012 })
1013 }
1014}
1015
1016impl From<Condition> for ExprImpl {
1017 fn from(c: Condition) -> Self {
1018 ExprImpl::and(c.conjunctions)
1019 }
1020}
1021
1022impl std::fmt::Debug for ExprImpl {
1026 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1027 if f.alternate() {
1028 return match self {
1029 Self::InputRef(arg0) => f.debug_tuple("InputRef").field(arg0).finish(),
1030 Self::Literal(arg0) => f.debug_tuple("Literal").field(arg0).finish(),
1031 Self::FunctionCall(arg0) => f.debug_tuple("FunctionCall").field(arg0).finish(),
1032 Self::FunctionCallWithLambda(arg0) => {
1033 f.debug_tuple("FunctionCallWithLambda").field(arg0).finish()
1034 }
1035 Self::AggCall(arg0) => f.debug_tuple("AggCall").field(arg0).finish(),
1036 Self::Subquery(arg0) => f.debug_tuple("Subquery").field(arg0).finish(),
1037 Self::CorrelatedInputRef(arg0) => {
1038 f.debug_tuple("CorrelatedInputRef").field(arg0).finish()
1039 }
1040 Self::TableFunction(arg0) => f.debug_tuple("TableFunction").field(arg0).finish(),
1041 Self::WindowFunction(arg0) => f.debug_tuple("WindowFunction").field(arg0).finish(),
1042 Self::UserDefinedFunction(arg0) => {
1043 f.debug_tuple("UserDefinedFunction").field(arg0).finish()
1044 }
1045 Self::Parameter(arg0) => f.debug_tuple("Parameter").field(arg0).finish(),
1046 Self::Now(_) => f.debug_tuple("Now").finish(),
1047 Self::SecretRef(arg0) => f.debug_tuple("SecretRef").field(arg0).finish(),
1048 };
1049 }
1050 match self {
1051 Self::InputRef(x) => write!(f, "{:?}", x),
1052 Self::Literal(x) => write!(f, "{:?}", x),
1053 Self::FunctionCall(x) => write!(f, "{:?}", x),
1054 Self::FunctionCallWithLambda(x) => write!(f, "{:?}", x),
1055 Self::AggCall(x) => write!(f, "{:?}", x),
1056 Self::Subquery(x) => write!(f, "{:?}", x),
1057 Self::CorrelatedInputRef(x) => write!(f, "{:?}", x),
1058 Self::TableFunction(x) => write!(f, "{:?}", x),
1059 Self::WindowFunction(x) => write!(f, "{:?}", x),
1060 Self::UserDefinedFunction(x) => write!(f, "{:?}", x),
1061 Self::Parameter(x) => write!(f, "{:?}", x),
1062 Self::Now(x) => write!(f, "{:?}", x),
1063 Self::SecretRef(x) => write!(f, "Secret({})", x.secret_name),
1064 }
1065 }
1066}
1067
1068pub struct ExprDisplay<'a> {
1069 pub expr: &'a ExprImpl,
1070 pub input_schema: &'a Schema,
1071}
1072
1073impl std::fmt::Debug for ExprDisplay<'_> {
1074 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1075 let that = self.expr;
1076 match that {
1077 ExprImpl::InputRef(x) => write!(
1078 f,
1079 "{:?}",
1080 InputRefDisplay {
1081 input_ref: x,
1082 input_schema: self.input_schema
1083 }
1084 ),
1085 ExprImpl::Literal(x) => write!(f, "{:?}", x),
1086 ExprImpl::FunctionCall(x) => write!(
1087 f,
1088 "{:?}",
1089 FunctionCallDisplay {
1090 function_call: x,
1091 input_schema: self.input_schema
1092 }
1093 ),
1094 ExprImpl::FunctionCallWithLambda(x) => write!(
1095 f,
1096 "{:?}",
1097 FunctionCallDisplay {
1098 function_call: &x.to_full_function_call(),
1099 input_schema: self.input_schema
1100 }
1101 ),
1102 ExprImpl::AggCall(x) => write!(f, "{:?}", x),
1103 ExprImpl::Subquery(x) => write!(f, "{:?}", x),
1104 ExprImpl::CorrelatedInputRef(x) => write!(f, "{:?}", x),
1105 ExprImpl::TableFunction(x) => {
1106 write!(f, "{:?}", x)
1108 }
1109 ExprImpl::WindowFunction(x) => {
1110 write!(f, "{:?}", x)
1112 }
1113 ExprImpl::UserDefinedFunction(x) => {
1114 write!(
1115 f,
1116 "{:?}",
1117 UserDefinedFunctionDisplay {
1118 func_call: x,
1119 input_schema: self.input_schema
1120 }
1121 )
1122 }
1123 ExprImpl::Parameter(x) => write!(f, "{:?}", x),
1124 ExprImpl::Now(x) => write!(f, "{:?}", x),
1125 ExprImpl::SecretRef(x) => write!(f, "Secret({})", x.secret_name),
1126 }
1127 }
1128}
1129
1130impl std::fmt::Display for ExprDisplay<'_> {
1131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1132 (self as &dyn std::fmt::Debug).fmt(f)
1133 }
1134}
1135
1136#[cfg(test)]
1137macro_rules! assert_eq_input_ref {
1139 ($e:expr, $index:expr) => {
1140 match $e {
1141 ExprImpl::InputRef(i) => assert_eq!(i.index(), $index),
1142 _ => assert!(false, "Expected input ref, found {:?}", $e),
1143 }
1144 };
1145}
1146
1147#[cfg(test)]
1148pub(crate) use assert_eq_input_ref;
1149use risingwave_common::bail;
1150use risingwave_common::catalog::Schema;
1151use risingwave_common::row::OwnedRow;
1152
1153use crate::utils::Condition;
1154
1155#[cfg(test)]
1156mod tests {
1157 use risingwave_pb::secret::secret_ref::RefAsType;
1158
1159 use super::*;
1160
1161 #[test]
1162 fn test_expr_debug_alternate() {
1163 let mut e = InputRef::new(1, DataType::Boolean).into();
1164 e = FunctionCall::new(ExprType::Not, vec![e]).unwrap().into();
1165 let s = format!("{:#?}", e);
1166 assert!(s.contains("return_type: Boolean"))
1167 }
1168
1169 #[test]
1170 fn test_secret_ref_display_contains_name() {
1171 let expr = ExprImpl::SecretRef(Box::new(SecretRef {
1172 secret_id: 42.into(),
1173 ref_as: RefAsType::Text,
1174 secret_name: "test_secret".to_owned(),
1175 }));
1176
1177 assert_eq!(format!("{expr:?}"), "Secret(test_secret)");
1178 assert_eq!(
1179 format!(
1180 "{:?}",
1181 ExprDisplay {
1182 expr: &expr,
1183 input_schema: Schema::empty(),
1184 }
1185 ),
1186 "Secret(test_secret)"
1187 );
1188 assert!(format!("{expr:#?}").contains("test_secret"));
1189 }
1190}