risingwave_stream/executor/
filter.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::{Array, ArrayImpl, Op};
16use risingwave_common::bitmap::BitmapBuilder;
17use risingwave_common::util::iter_util::ZipEqFast;
18use risingwave_expr::expr::NonStrictExpression;
19
20use crate::executor::prelude::*;
21
22/// `FilterExecutor` filters data with the `expr`. The `expr` takes a chunk of data,
23/// and returns a boolean array on whether each item should be retained. And then,
24/// `FilterExecutor` will insert, delete or update element into next executor according
25/// to the result of the expression.
26///
27/// # Upsert mode
28///
29/// When `UPSERT` is specified, the filter executor will...
30/// - always yield `Delete` records on `Delete` (or `UpdateDelete`) input, regardless of the filter result
31/// - always yield `Delete` records on `Insert` (or `UpdateInsert`) input, if the filter result is `false`
32///
33/// This is to guarantee that the filtered-out data is correctly cleaned up, even if we
34/// don't have information about the previous data under the same primary key. Since an
35/// upsert stream is expected as the output of the filter executor in this case, we can
36/// safely yield a `Delete` record no matter if it used to exist or not.
37pub struct FilterExecutorInner<const UPSERT: bool> {
38    _ctx: ActorContextRef,
39    input: Executor,
40
41    /// Expression of the current filter, note that the filter must always have the same output for
42    /// the same input.
43    expr: NonStrictExpression,
44}
45
46pub type FilterExecutor = FilterExecutorInner<false>;
47pub type UpsertFilterExecutor = FilterExecutorInner<true>;
48
49impl<const UPSERT: bool> FilterExecutorInner<UPSERT> {
50    pub fn new(ctx: ActorContextRef, input: Executor, expr: NonStrictExpression) -> Self {
51        Self {
52            _ctx: ctx,
53            input,
54            expr,
55        }
56    }
57
58    pub(super) fn filter(
59        chunk: StreamChunk,
60        filter: Arc<ArrayImpl>,
61    ) -> StreamExecutorResult<Option<StreamChunk>> {
62        let (data_chunk, ops) = chunk.into_parts();
63
64        let (columns, vis) = data_chunk.into_parts();
65
66        let n = ops.len();
67
68        // TODO: Can we update ops and visibility inplace?
69        let mut new_ops = Vec::with_capacity(n);
70        let mut new_visibility = BitmapBuilder::with_capacity(n);
71        let mut last_res = false;
72
73        assert_eq!(vis.len(), n);
74
75        let ArrayImpl::Bool(bool_array) = &*filter else {
76            panic!("unmatched type: filter expr returns a non-null array");
77        };
78        for (&op, res) in ops.iter().zip_eq_fast(bool_array.iter()) {
79            // SAFETY: ops.len() == pred_output.len() == visibility.len()
80            let res = res.unwrap_or(false);
81
82            if UPSERT {
83                match op {
84                    Op::Insert | Op::UpdateInsert => {
85                        if res {
86                            // Always emit an `Insert`.
87                            // - if the row didn't exist before, this is a new row
88                            // - if the row existed before, this is an update
89                            new_ops.push(Op::Insert);
90                            new_visibility.append(true);
91                        } else {
92                            // Always emit a `Delete`.
93                            // - if the row didn't exist before, this is a no-op
94                            // - if the row existed before, this is a deletion
95                            new_ops.push(Op::Delete);
96                            new_visibility.append(true);
97                        }
98                    }
99                    Op::Delete | Op::UpdateDelete => {
100                        // Always emit a `Delete` no matter what the filter result is.
101                        // - if the row didn't exist before, this is a no-op
102                        // - if the row existed before, this is a deletion
103                        new_ops.push(Op::Delete);
104                        new_visibility.append(true);
105                    }
106                }
107            } else {
108                match op {
109                    Op::Insert | Op::Delete => {
110                        new_ops.push(op);
111                        new_visibility.append(res);
112                    }
113                    Op::UpdateDelete => {
114                        last_res = res;
115                    }
116                    Op::UpdateInsert => match (last_res, res) {
117                        (true, false) => {
118                            new_ops.push(Op::Delete);
119                            new_ops.push(Op::UpdateInsert);
120                            new_visibility.append(true);
121                            new_visibility.append(false);
122                        }
123                        (false, true) => {
124                            new_ops.push(Op::UpdateDelete);
125                            new_ops.push(Op::Insert);
126                            new_visibility.append(false);
127                            new_visibility.append(true);
128                        }
129                        (true, true) => {
130                            new_ops.push(Op::UpdateDelete);
131                            new_ops.push(Op::UpdateInsert);
132                            new_visibility.append(true);
133                            new_visibility.append(true);
134                        }
135                        (false, false) => {
136                            new_ops.push(Op::UpdateDelete);
137                            new_ops.push(Op::UpdateInsert);
138                            new_visibility.append(false);
139                            new_visibility.append(false);
140                        }
141                    },
142                }
143            }
144        }
145
146        let new_visibility = new_visibility.finish();
147
148        Ok(if new_visibility.count_ones() > 0 {
149            let new_chunk = StreamChunk::with_visibility(new_ops, columns, new_visibility);
150            Some(new_chunk)
151        } else {
152            None
153        })
154    }
155}
156
157impl<const UPSERT: bool> Debug for FilterExecutorInner<UPSERT> {
158    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("FilterExecutor")
160            .field("expr", &self.expr)
161            .field("upsert", &UPSERT)
162            .finish()
163    }
164}
165
166impl<const UPSERT: bool> Execute for FilterExecutorInner<UPSERT> {
167    fn execute(self: Box<Self>) -> BoxedMessageStream {
168        self.execute_inner().boxed()
169    }
170}
171
172impl<const UPSERT: bool> FilterExecutorInner<UPSERT> {
173    #[try_stream(ok = Message, error = StreamExecutorError)]
174    async fn execute_inner(self) {
175        let input = self.input.execute();
176        #[for_await]
177        for msg in input {
178            let msg = msg?;
179            match msg {
180                Message::Watermark(w) => yield Message::Watermark(w),
181                Message::Chunk(chunk) => {
182                    let chunk = chunk.compact();
183
184                    let pred_output = self.expr.eval_infallible(chunk.data_chunk()).await;
185
186                    match Self::filter(chunk, pred_output)? {
187                        Some(new_chunk) => yield Message::Chunk(new_chunk),
188                        None => continue,
189                    }
190                }
191                m => yield m,
192            }
193        }
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
200    use risingwave_common::catalog::Field;
201
202    use super::super::test_utils::MockSource;
203    use super::super::test_utils::expr::build_from_pretty;
204    use super::super::*;
205    use super::*;
206
207    #[tokio::test]
208    async fn test_filter() {
209        let chunk1 = StreamChunk::from_pretty(
210            " I I
211            + 1 4
212            + 5 2
213            + 6 6
214            - 7 5",
215        );
216        let chunk2 = StreamChunk::from_pretty(
217            "  I I
218            U- 5 3  // true -> true
219            U+ 7 5  // expect UpdateDelete, UpdateInsert
220            U- 5 3  // true -> false
221            U+ 3 5  // expect Delete
222            U- 3 5  // false -> true
223            U+ 5 3  // expect Insert
224            U- 3 5  // false -> false
225            U+ 4 6  // expect nothing",
226        );
227        let schema = Schema {
228            fields: vec![
229                Field::unnamed(DataType::Int64),
230                Field::unnamed(DataType::Int64),
231            ],
232        };
233        let pk_indices = PkIndices::new();
234        let source = MockSource::with_chunks(vec![chunk1, chunk2])
235            .into_executor(schema.clone(), pk_indices.clone());
236
237        let test_expr = build_from_pretty("(greater_than:boolean $0:int8 $1:int8)");
238
239        let mut filter = FilterExecutor::new(ActorContext::for_test(123), source, test_expr)
240            .boxed()
241            .execute();
242
243        let chunk = filter.next().await.unwrap().unwrap().into_chunk().unwrap();
244        assert_eq!(
245            chunk,
246            StreamChunk::from_pretty(
247                " I I
248                + 1 4 D
249                + 5 2
250                + 6 6 D
251                - 7 5",
252            )
253        );
254
255        let chunk = filter.next().await.unwrap().unwrap().into_chunk().unwrap();
256        assert_eq!(
257            chunk,
258            StreamChunk::from_pretty(
259                "  I I
260                U- 5 3
261                U+ 7 5
262                -  5 3
263                U+ 3 5 D
264                U- 3 5 D
265                +  5 3
266                U- 3 5 D
267                U+ 4 6 D",
268            )
269        );
270
271        assert!(filter.next().await.unwrap().unwrap().is_stop());
272    }
273
274    #[tokio::test]
275    async fn test_upsert_filter() {
276        let chunk = StreamChunk::from_pretty(
277            " I  I
278            + 10 14
279            + 20 5
280            + 10 7
281            + 20 16
282            + 20 18
283            - 10 .
284            - 20 .
285            - 30 .
286            ",
287        );
288        let schema = Schema {
289            fields: vec![
290                Field::unnamed(DataType::Int64),
291                Field::unnamed(DataType::Int64),
292            ],
293        };
294        let pk_indices = vec![0];
295        let source =
296            MockSource::with_chunks(vec![chunk]).into_executor(schema.clone(), pk_indices.clone());
297        let test_expr = build_from_pretty("(greater_than:boolean $1:int8 10:int8)");
298        let mut filter = UpsertFilterExecutor::new(ActorContext::for_test(123), source, test_expr)
299            .boxed()
300            .execute();
301        let chunk = filter.next().await.unwrap().unwrap().into_chunk().unwrap();
302        assert_eq!(
303            chunk,
304            StreamChunk::from_pretty(
305                " I  I
306                + 10 14
307                - 20 5
308                - 10 7
309                + 20 16
310                + 20 18
311                - 10 .
312                - 20 .
313                - 30 .
314                ",
315            )
316        );
317    }
318}