risingwave_expr/expr/wrapper/
non_strict.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use async_trait::async_trait;
18use auto_impl::auto_impl;
19use risingwave_common::array::{ArrayRef, DataChunk};
20use risingwave_common::log::LogSuppresser;
21use risingwave_common::row::OwnedRow;
22use risingwave_common::types::{DataType, Datum};
23use thiserror_ext::AsReport;
24
25use crate::ExprError;
26use crate::error::Result;
27use crate::expr::{Expression, ValueImpl};
28
29/// Report an error during evaluation.
30#[auto_impl(&, Arc)]
31pub trait EvalErrorReport: Clone + Send + Sync {
32    /// Perform the error reporting.
33    ///
34    /// Called when an error occurs during row-level evaluation of a non-strict expression,
35    /// that is, wrapped by [`NonStrict`].
36    fn report(&self, error: ExprError);
37}
38
39/// A dummy implementation that panics when called.
40///
41/// Used as the type parameter for the expression builder when non-strict evaluation is not
42/// required.
43impl EvalErrorReport for ! {
44    fn report(&self, _error: ExprError) {
45        unreachable!()
46    }
47}
48
49/// Log the error to report an error during evaluation.
50#[derive(Clone)]
51pub struct LogReport;
52
53impl EvalErrorReport for LogReport {
54    fn report(&self, error: ExprError) {
55        static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
56        if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
57            tracing::error!(error=%error.as_report(), suppressed_count, "failed to evaluate expression");
58        }
59    }
60}
61
62/// A wrapper of [`Expression`] that evaluates in a non-strict way. Basically...
63/// - When an error occurs during chunk-level evaluation, pad with NULL for each failed row.
64/// - Report all error occurred during row-level evaluation to the [`EvalErrorReport`].
65pub(crate) struct NonStrict<E, R> {
66    inner: E,
67    report: R,
68}
69
70impl<E, R> std::fmt::Debug for NonStrict<E, R>
71where
72    E: std::fmt::Debug,
73{
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("NonStrict")
76            .field("inner", &self.inner)
77            .field("report", &std::any::type_name::<R>())
78            .finish()
79    }
80}
81
82impl<E, R> NonStrict<E, R>
83where
84    E: Expression,
85    R: EvalErrorReport,
86{
87    pub fn new(inner: E, report: R) -> Self {
88        Self { inner, report }
89    }
90}
91
92// TODO: avoid the overhead of extra boxing.
93#[async_trait]
94impl<E, R> Expression for NonStrict<E, R>
95where
96    E: Expression,
97    R: EvalErrorReport,
98{
99    fn return_type(&self) -> DataType {
100        self.inner.return_type()
101    }
102
103    async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
104        Ok(match self.inner.eval(input).await {
105            Ok(array) => array,
106            Err(ExprError::Multiple(array, errors)) => {
107                for error in errors {
108                    self.report.report(error);
109                }
110                array
111            }
112            Err(e) => {
113                self.report.report(e);
114                let mut builder = self.return_type().create_array_builder(input.capacity());
115                builder.append_n_null(input.capacity());
116                builder.finish().into()
117            }
118        })
119    }
120
121    async fn eval_v2(&self, input: &DataChunk) -> Result<ValueImpl> {
122        Ok(match self.inner.eval_v2(input).await {
123            Ok(array) => array,
124            Err(ExprError::Multiple(array, errors)) => {
125                for error in errors {
126                    self.report.report(error);
127                }
128                array.into()
129            }
130            Err(e) => {
131                self.report.report(e);
132                ValueImpl::Scalar {
133                    value: None,
134                    capacity: input.capacity(),
135                }
136            }
137        })
138    }
139
140    /// Evaluate expression on a single row, report error and return NULL if failed.
141    async fn eval_row(&self, input: &OwnedRow) -> Result<Datum> {
142        Ok(match self.inner.eval_row(input).await {
143            Ok(datum) => datum,
144            Err(error) => {
145                self.report.report(error);
146                None // NULL
147            }
148        })
149    }
150
151    fn eval_const(&self) -> Result<Datum> {
152        self.inner.eval_const() // do not handle error
153    }
154
155    fn input_ref_index(&self) -> Option<usize> {
156        self.inner.input_ref_index()
157    }
158}