risingwave_expr/expr/wrapper/
checked.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use risingwave_common::array::{ArrayRef, DataChunk};
17use risingwave_common::row::OwnedRow;
18use risingwave_common::types::{DataType, Datum};
19
20use crate::error::Result;
21use crate::expr::{Expression, ValueImpl};
22
23/// A wrapper of [`Expression`] that does extra checks after evaluation.
24#[derive(Debug)]
25pub(crate) struct Checked<E>(pub E);
26
27// TODO: avoid the overhead of extra boxing.
28#[async_trait]
29impl<E: Expression> Expression for Checked<E> {
30    fn return_type(&self) -> DataType {
31        self.0.return_type()
32    }
33
34    async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
35        let res = self.0.eval(input).await?;
36        assert_eq!(res.len(), input.capacity());
37        Ok(res)
38    }
39
40    async fn eval_v2(&self, input: &DataChunk) -> Result<ValueImpl> {
41        let res = self.0.eval_v2(input).await?;
42        assert_eq!(res.len(), input.capacity());
43        Ok(res)
44    }
45
46    async fn eval_row(&self, input: &OwnedRow) -> Result<Datum> {
47        self.0.eval_row(input).await
48    }
49
50    fn eval_const(&self) -> Result<Datum> {
51        self.0.eval_const()
52    }
53
54    fn input_ref_index(&self) -> Option<usize> {
55        self.0.input_ref_index()
56    }
57}