risingwave_stream/executor/project/
project_scalar.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use multimap::MultiMap;
16use risingwave_common::row::RowExt;
17use risingwave_common::types::ToOwnedDatum;
18use risingwave_common::util::iter_util::ZipEqFast;
19use risingwave_expr::expr::NonStrictExpression;
20
21use crate::executor::prelude::*;
22
23/// `ProjectExecutor` project data with the `expr`. The `expr` takes a chunk of data,
24/// and returns a new data chunk. And then, `ProjectExecutor` will insert, delete
25/// or update element into next operator according to the result of the expression.
26pub struct ProjectExecutor {
27    input: Executor,
28    inner: Inner,
29}
30
31struct Inner {
32    _ctx: ActorContextRef,
33
34    /// Expressions of the current projection.
35    exprs: Vec<NonStrictExpression>,
36    /// All the watermark derivations, (`input_column_index`, `output_column_index`). And the
37    /// derivation expression is the project's expression itself.
38    watermark_derivations: MultiMap<usize, usize>,
39    /// Indices of nondecreasing expressions in the expression list.
40    nondecreasing_expr_indices: Vec<usize>,
41    /// Last seen values of nondecreasing expressions, buffered to periodically produce watermarks.
42    last_nondec_expr_values: Vec<Option<ScalarImpl>>,
43    /// Whether the stream is paused.
44    is_paused: bool,
45
46    /// Whether there are likely no-op updates in the output chunks, so that eliminating them with
47    /// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial.
48    noop_update_hint: bool,
49}
50
51impl ProjectExecutor {
52    pub fn new(
53        ctx: ActorContextRef,
54        input: Executor,
55        exprs: Vec<NonStrictExpression>,
56        watermark_derivations: MultiMap<usize, usize>,
57        nondecreasing_expr_indices: Vec<usize>,
58        noop_update_hint: bool,
59    ) -> Self {
60        let n_nondecreasing_exprs = nondecreasing_expr_indices.len();
61        Self {
62            input,
63            inner: Inner {
64                _ctx: ctx,
65                exprs,
66                watermark_derivations,
67                nondecreasing_expr_indices,
68                last_nondec_expr_values: vec![None; n_nondecreasing_exprs],
69                is_paused: false,
70                noop_update_hint,
71            },
72        }
73    }
74}
75
76impl Debug for ProjectExecutor {
77    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("ProjectExecutor")
79            .field("exprs", &self.inner.exprs)
80            .finish()
81    }
82}
83
84impl Execute for ProjectExecutor {
85    fn execute(self: Box<Self>) -> BoxedMessageStream {
86        self.inner.execute(self.input).boxed()
87    }
88}
89
90pub async fn apply_project_exprs(
91    exprs: &[NonStrictExpression],
92    chunk: StreamChunk,
93) -> StreamExecutorResult<StreamChunk> {
94    let (data_chunk, ops) = chunk.into_parts();
95    let mut projected_columns = Vec::new();
96
97    for expr in exprs {
98        let evaluated_expr = expr.eval_infallible(&data_chunk).await;
99        projected_columns.push(evaluated_expr);
100    }
101    let (_, vis) = data_chunk.into_parts();
102
103    let new_chunk = StreamChunk::with_visibility(ops, projected_columns, vis);
104
105    Ok(new_chunk)
106}
107
108impl Inner {
109    async fn map_filter_chunk(
110        &self,
111        chunk: StreamChunk,
112    ) -> StreamExecutorResult<Option<StreamChunk>> {
113        let mut new_chunk = apply_project_exprs(&self.exprs, chunk).await?;
114        if self.noop_update_hint {
115            new_chunk = new_chunk.eliminate_adjacent_noop_update();
116        }
117        Ok(Some(new_chunk))
118    }
119
120    async fn handle_watermark(&self, watermark: Watermark) -> StreamExecutorResult<Vec<Watermark>> {
121        let out_col_indices = match self.watermark_derivations.get_vec(&watermark.col_idx) {
122            Some(v) => v,
123            None => return Ok(vec![]),
124        };
125        let mut ret = vec![];
126        for out_col_idx in out_col_indices {
127            let out_col_idx = *out_col_idx;
128            let derived_watermark = watermark
129                .clone()
130                .transform_with_expr(&self.exprs[out_col_idx], out_col_idx)
131                .await;
132            if let Some(derived_watermark) = derived_watermark {
133                ret.push(derived_watermark);
134            } else {
135                warn!(
136                    "a NULL watermark is derived with the expression {}!",
137                    out_col_idx
138                );
139            }
140        }
141        Ok(ret)
142    }
143
144    #[try_stream(ok = Message, error = StreamExecutorError)]
145    async fn execute(mut self, input: Executor) {
146        let mut input = input.execute();
147        let first_barrier = expect_first_barrier(&mut input).await?;
148        self.is_paused = first_barrier.is_pause_on_startup();
149        yield Message::Barrier(first_barrier);
150
151        #[for_await]
152        for msg in input {
153            let msg = msg?;
154            match msg {
155                Message::Watermark(w) => {
156                    let watermarks = self.handle_watermark(w).await?;
157                    for watermark in watermarks {
158                        yield Message::Watermark(watermark)
159                    }
160                }
161                Message::Chunk(chunk) => match self.map_filter_chunk(chunk).await? {
162                    Some(new_chunk) => {
163                        if !self.nondecreasing_expr_indices.is_empty()
164                            && let Some((_, first_visible_row)) = new_chunk.rows().next()
165                        {
166                            // it's ok to use the first row here, just one chunk delay
167                            first_visible_row
168                                .project(&self.nondecreasing_expr_indices)
169                                .iter()
170                                .enumerate()
171                                .for_each(|(idx, value)| {
172                                    self.last_nondec_expr_values[idx] =
173                                        Some(value.to_owned_datum().expect(
174                                            "non-decreasing expression should never be NULL",
175                                        ));
176                                });
177                        }
178                        yield Message::Chunk(new_chunk)
179                    }
180                    None => continue,
181                },
182                Message::Barrier(barrier) => {
183                    if !self.is_paused {
184                        for (&expr_idx, value) in self
185                            .nondecreasing_expr_indices
186                            .iter()
187                            .zip_eq_fast(&mut self.last_nondec_expr_values)
188                        {
189                            if let Some(value) = std::mem::take(value) {
190                                yield Message::Watermark(Watermark::new(
191                                    expr_idx,
192                                    self.exprs[expr_idx].return_type(),
193                                    value,
194                                ))
195                            }
196                        }
197                    }
198
199                    if let Some(mutation) = barrier.mutation.as_deref() {
200                        match mutation {
201                            Mutation::Pause => {
202                                self.is_paused = true;
203                            }
204                            Mutation::Resume => {
205                                self.is_paused = false;
206                            }
207                            _ => (),
208                        }
209                    }
210
211                    yield Message::Barrier(barrier);
212                }
213            }
214        }
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use std::sync::atomic::{self, AtomicI64};
221
222    use risingwave_common::array::DataChunk;
223    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
224    use risingwave_common::catalog::Field;
225    use risingwave_common::types::DefaultOrd;
226    use risingwave_common::util::epoch::test_epoch;
227    use risingwave_expr::expr::{self, Expression, ValueImpl};
228
229    use super::*;
230    use crate::executor::test_utils::expr::build_from_pretty;
231    use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
232
233    #[tokio::test]
234    async fn test_projection() {
235        let chunk1 = StreamChunk::from_pretty(
236            " I I
237            + 1 4
238            + 2 5
239            + 3 6",
240        );
241        let chunk2 = StreamChunk::from_pretty(
242            " I I
243            + 7 8
244            - 3 6",
245        );
246        let schema = Schema {
247            fields: vec![
248                Field::unnamed(DataType::Int64),
249                Field::unnamed(DataType::Int64),
250            ],
251        };
252        let pk_indices = vec![0];
253        let (mut tx, source) = MockSource::channel();
254        let source = source.into_executor(schema, pk_indices);
255
256        let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)");
257
258        let proj = ProjectExecutor::new(
259            ActorContext::for_test(123),
260            source,
261            vec![test_expr],
262            MultiMap::new(),
263            vec![],
264            false,
265        );
266        let mut proj = proj.boxed().execute();
267
268        tx.push_barrier(test_epoch(1), false);
269        let barrier = proj.next().await.unwrap().unwrap();
270        barrier.as_barrier().unwrap();
271
272        tx.push_chunk(chunk1);
273        tx.push_chunk(chunk2);
274
275        let msg = proj.next().await.unwrap().unwrap();
276        assert_eq!(
277            *msg.as_chunk().unwrap(),
278            StreamChunk::from_pretty(
279                " I
280                + 5
281                + 7
282                + 9"
283            )
284        );
285
286        let msg = proj.next().await.unwrap().unwrap();
287        assert_eq!(
288            *msg.as_chunk().unwrap(),
289            StreamChunk::from_pretty(
290                "  I
291                + 15
292                -  9"
293            )
294        );
295
296        tx.push_barrier(test_epoch(2), true);
297        assert!(proj.next().await.unwrap().unwrap().is_stop());
298    }
299
300    static DUMMY_COUNTER: AtomicI64 = AtomicI64::new(0);
301
302    #[derive(Debug)]
303    struct DummyNondecreasingExpr;
304
305    #[async_trait::async_trait]
306    impl Expression for DummyNondecreasingExpr {
307        fn return_type(&self) -> DataType {
308            DataType::Int64
309        }
310
311        async fn eval_v2(&self, input: &DataChunk) -> expr::Result<ValueImpl> {
312            let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
313            Ok(ValueImpl::Scalar {
314                value: Some(value.into()),
315                capacity: input.capacity(),
316            })
317        }
318
319        async fn eval_row(&self, _input: &OwnedRow) -> expr::Result<Datum> {
320            let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
321            Ok(Some(value.into()))
322        }
323    }
324
325    #[tokio::test]
326    async fn test_watermark_projection() {
327        let schema = Schema {
328            fields: vec![
329                Field::unnamed(DataType::Int64),
330                Field::unnamed(DataType::Int64),
331            ],
332        };
333        let (mut tx, source) = MockSource::channel();
334        let source = source.into_executor(schema, PkIndices::new());
335
336        let a_expr = build_from_pretty("(add:int8 $0:int8 1:int8)");
337        let b_expr = build_from_pretty("(subtract:int8 $0:int8 1:int8)");
338        let c_expr = NonStrictExpression::for_test(DummyNondecreasingExpr);
339
340        let proj = ProjectExecutor::new(
341            ActorContext::for_test(123),
342            source,
343            vec![a_expr, b_expr, c_expr],
344            MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()),
345            vec![2],
346            false,
347        );
348        let mut proj = proj.boxed().execute();
349
350        tx.push_barrier(test_epoch(1), false);
351        tx.push_int64_watermark(0, 100);
352
353        proj.expect_barrier().await;
354        let w1 = proj.expect_watermark().await;
355        let w2 = proj.expect_watermark().await;
356        let (w1, w2) = if w1.col_idx < w2.col_idx {
357            (w1, w2)
358        } else {
359            (w2, w1)
360        };
361
362        assert_eq!(
363            w1,
364            Watermark {
365                col_idx: 0,
366                data_type: DataType::Int64,
367                val: ScalarImpl::Int64(101)
368            }
369        );
370        assert_eq!(
371            w2,
372            Watermark {
373                col_idx: 1,
374                data_type: DataType::Int64,
375                val: ScalarImpl::Int64(99)
376            }
377        );
378
379        // just push some random chunks
380        tx.push_chunk(StreamChunk::from_pretty(
381            "   I I
382            + 120 4
383            + 146 5
384            + 133 6",
385        ));
386        proj.expect_chunk().await;
387        tx.push_chunk(StreamChunk::from_pretty(
388            "   I I
389            + 213 8
390            - 133 6",
391        ));
392        proj.expect_chunk().await;
393
394        tx.push_barrier(test_epoch(2), false);
395        let w3 = proj.expect_watermark().await;
396        proj.expect_barrier().await;
397
398        tx.push_chunk(StreamChunk::from_pretty(
399            "   I I
400            + 100 3
401            + 104 5
402            + 187 3",
403        ));
404        proj.expect_chunk().await;
405
406        tx.push_barrier(test_epoch(3), false);
407        let w4 = proj.expect_watermark().await;
408        proj.expect_barrier().await;
409
410        assert_eq!(w3.col_idx, w4.col_idx);
411        assert!(w3.val.default_cmp(&w4.val).is_le());
412
413        tx.push_int64_watermark(1, 100);
414        tx.push_barrier(test_epoch(4), true);
415
416        assert!(proj.next().await.unwrap().unwrap().is_stop());
417    }
418}