risingwave_expr/expr/wrapper/
non_strict.rs1use std::sync::LazyLock;
16
17use async_trait::async_trait;
18use auto_impl::auto_impl;
19use risingwave_common::array::{ArrayRef, DataChunk};
20use risingwave_common::log::LogSuppresser;
21use risingwave_common::row::OwnedRow;
22use risingwave_common::types::{DataType, Datum};
23use thiserror_ext::AsReport;
24
25use crate::ExprError;
26use crate::error::Result;
27use crate::expr::{Expression, ValueImpl};
28
29#[auto_impl(&, Arc)]
31pub trait EvalErrorReport: Clone + Send + Sync {
32 fn report(&self, error: ExprError);
37}
38
39impl EvalErrorReport for ! {
44 fn report(&self, _error: ExprError) {
45 unreachable!()
46 }
47}
48
49#[derive(Clone)]
51pub struct LogReport;
52
53impl EvalErrorReport for LogReport {
54 fn report(&self, error: ExprError) {
55 static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
56 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
57 tracing::error!(error=%error.as_report(), suppressed_count, "failed to evaluate expression");
58 }
59 }
60}
61
62pub(crate) struct NonStrict<E, R> {
66 inner: E,
67 report: R,
68}
69
70impl<E, R> std::fmt::Debug for NonStrict<E, R>
71where
72 E: std::fmt::Debug,
73{
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("NonStrict")
76 .field("inner", &self.inner)
77 .field("report", &std::any::type_name::<R>())
78 .finish()
79 }
80}
81
82impl<E, R> NonStrict<E, R>
83where
84 E: Expression,
85 R: EvalErrorReport,
86{
87 pub fn new(inner: E, report: R) -> Self {
88 Self { inner, report }
89 }
90}
91
92#[async_trait]
94impl<E, R> Expression for NonStrict<E, R>
95where
96 E: Expression,
97 R: EvalErrorReport,
98{
99 fn return_type(&self) -> DataType {
100 self.inner.return_type()
101 }
102
103 async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
104 Ok(match self.inner.eval(input).await {
105 Ok(array) => array,
106 Err(ExprError::Multiple(array, errors)) => {
107 for error in errors {
108 self.report.report(error);
109 }
110 array
111 }
112 Err(e) => {
113 self.report.report(e);
114 let mut builder = self.return_type().create_array_builder(input.capacity());
115 builder.append_n_null(input.capacity());
116 builder.finish().into()
117 }
118 })
119 }
120
121 async fn eval_v2(&self, input: &DataChunk) -> Result<ValueImpl> {
122 Ok(match self.inner.eval_v2(input).await {
123 Ok(array) => array,
124 Err(ExprError::Multiple(array, errors)) => {
125 for error in errors {
126 self.report.report(error);
127 }
128 array.into()
129 }
130 Err(e) => {
131 self.report.report(e);
132 ValueImpl::Scalar {
133 value: None,
134 capacity: input.capacity(),
135 }
136 }
137 })
138 }
139
140 async fn eval_row(&self, input: &OwnedRow) -> Result<Datum> {
142 Ok(match self.inner.eval_row(input).await {
143 Ok(datum) => datum,
144 Err(error) => {
145 self.report.report(error);
146 None }
148 })
149 }
150
151 fn eval_const(&self) -> Result<Datum> {
152 self.inner.eval_const() }
154
155 fn input_ref_index(&self) -> Option<usize> {
156 self.inner.input_ref_index()
157 }
158}