risingwave_stream/executor/project/
project_scalar.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use multimap::MultiMap;
16use risingwave_common::row::RowExt;
17use risingwave_common::types::ToOwnedDatum;
18use risingwave_common::util::iter_util::ZipEqFast;
19use risingwave_expr::expr::NonStrictExpression;
20
21use crate::executor::prelude::*;
22
23/// `ProjectExecutor` project data with the `expr`. The `expr` takes a chunk of data,
24/// and returns a new data chunk. And then, `ProjectExecutor` will insert, delete
25/// or update element into next operator according to the result of the expression.
26pub struct ProjectExecutor {
27    input: Executor,
28    inner: Inner,
29}
30
31struct Inner {
32    _ctx: ActorContextRef,
33
34    /// Expressions of the current projection.
35    exprs: Vec<NonStrictExpression>,
36    /// All the watermark derivations, (`input_column_index`, `output_column_index`). And the
37    /// derivation expression is the project's expression itself.
38    watermark_derivations: MultiMap<usize, usize>,
39    /// Indices of nondecreasing expressions in the expression list.
40    nondecreasing_expr_indices: Vec<usize>,
41    /// Last seen values of nondecreasing expressions, buffered to periodically produce watermarks.
42    last_nondec_expr_values: Vec<Option<ScalarImpl>>,
43    /// Whether the stream is paused.
44    is_paused: bool,
45
46    /// Whether there are likely no-op updates in the output chunks, so that eliminating them with
47    /// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial.
48    eliminate_noop_updates: bool,
49}
50
51impl ProjectExecutor {
52    pub fn new(
53        ctx: ActorContextRef,
54        input: Executor,
55        exprs: Vec<NonStrictExpression>,
56        watermark_derivations: MultiMap<usize, usize>,
57        nondecreasing_expr_indices: Vec<usize>,
58        noop_update_hint: bool,
59    ) -> Self {
60        let n_nondecreasing_exprs = nondecreasing_expr_indices.len();
61        let eliminate_noop_updates = noop_update_hint
62            || ctx
63                .streaming_config
64                .developer
65                .aggressive_noop_update_elimination;
66        Self {
67            input,
68            inner: Inner {
69                _ctx: ctx,
70                exprs,
71                watermark_derivations,
72                nondecreasing_expr_indices,
73                last_nondec_expr_values: vec![None; n_nondecreasing_exprs],
74                is_paused: false,
75                eliminate_noop_updates,
76            },
77        }
78    }
79}
80
81impl Debug for ProjectExecutor {
82    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
83        f.debug_struct("ProjectExecutor")
84            .field("exprs", &self.inner.exprs)
85            .finish()
86    }
87}
88
89impl Execute for ProjectExecutor {
90    fn execute(self: Box<Self>) -> BoxedMessageStream {
91        self.inner.execute(self.input).boxed()
92    }
93}
94
95pub async fn apply_project_exprs(
96    exprs: &[NonStrictExpression],
97    chunk: StreamChunk,
98) -> StreamExecutorResult<StreamChunk> {
99    let (data_chunk, ops) = chunk.into_parts();
100    let mut projected_columns = Vec::new();
101
102    for expr in exprs {
103        let evaluated_expr = expr.eval_infallible(&data_chunk).await;
104        projected_columns.push(evaluated_expr);
105    }
106    let (_, vis) = data_chunk.into_parts();
107
108    let new_chunk = StreamChunk::with_visibility(ops, projected_columns, vis);
109
110    Ok(new_chunk)
111}
112
113impl Inner {
114    async fn map_filter_chunk(
115        &self,
116        chunk: StreamChunk,
117    ) -> StreamExecutorResult<Option<StreamChunk>> {
118        let mut new_chunk = apply_project_exprs(&self.exprs, chunk).await?;
119        if self.eliminate_noop_updates {
120            new_chunk = new_chunk.eliminate_adjacent_noop_update();
121        }
122        Ok(Some(new_chunk))
123    }
124
125    async fn handle_watermark(&self, watermark: Watermark) -> StreamExecutorResult<Vec<Watermark>> {
126        let out_col_indices = match self.watermark_derivations.get_vec(&watermark.col_idx) {
127            Some(v) => v,
128            None => return Ok(vec![]),
129        };
130        let mut ret = vec![];
131        for out_col_idx in out_col_indices {
132            let out_col_idx = *out_col_idx;
133            let derived_watermark = watermark
134                .clone()
135                .transform_with_expr(&self.exprs[out_col_idx], out_col_idx)
136                .await;
137            if let Some(derived_watermark) = derived_watermark {
138                ret.push(derived_watermark);
139            } else {
140                warn!(
141                    "a NULL watermark is derived with the expression {}!",
142                    out_col_idx
143                );
144            }
145        }
146        Ok(ret)
147    }
148
149    #[try_stream(ok = Message, error = StreamExecutorError)]
150    async fn execute(mut self, input: Executor) {
151        let mut input = input.execute();
152        let first_barrier = expect_first_barrier(&mut input).await?;
153        self.is_paused = first_barrier.is_pause_on_startup();
154        yield Message::Barrier(first_barrier);
155
156        #[for_await]
157        for msg in input {
158            let msg = msg?;
159            match msg {
160                Message::Watermark(w) => {
161                    let watermarks = self.handle_watermark(w).await?;
162                    for watermark in watermarks {
163                        yield Message::Watermark(watermark)
164                    }
165                }
166                Message::Chunk(chunk) => match self.map_filter_chunk(chunk).await? {
167                    Some(new_chunk) => {
168                        if !self.nondecreasing_expr_indices.is_empty()
169                            && let Some((_, first_visible_row)) = new_chunk.rows().next()
170                        {
171                            // it's ok to use the first row here, just one chunk delay
172                            first_visible_row
173                                .project(&self.nondecreasing_expr_indices)
174                                .iter()
175                                .enumerate()
176                                .for_each(|(idx, value)| {
177                                    self.last_nondec_expr_values[idx] =
178                                        Some(value.to_owned_datum().expect(
179                                            "non-decreasing expression should never be NULL",
180                                        ));
181                                });
182                        }
183                        yield Message::Chunk(new_chunk)
184                    }
185                    None => continue,
186                },
187                Message::Barrier(barrier) => {
188                    if !self.is_paused {
189                        for (&expr_idx, value) in self
190                            .nondecreasing_expr_indices
191                            .iter()
192                            .zip_eq_fast(&mut self.last_nondec_expr_values)
193                        {
194                            if let Some(value) = std::mem::take(value) {
195                                yield Message::Watermark(Watermark::new(
196                                    expr_idx,
197                                    self.exprs[expr_idx].return_type(),
198                                    value,
199                                ))
200                            }
201                        }
202                    }
203
204                    if let Some(mutation) = barrier.mutation.as_deref() {
205                        match mutation {
206                            Mutation::Pause => {
207                                self.is_paused = true;
208                            }
209                            Mutation::Resume => {
210                                self.is_paused = false;
211                            }
212                            _ => (),
213                        }
214                    }
215
216                    yield Message::Barrier(barrier);
217                }
218            }
219        }
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use std::sync::atomic::{self, AtomicI64};
226
227    use risingwave_common::array::DataChunk;
228    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
229    use risingwave_common::catalog::Field;
230    use risingwave_common::types::DefaultOrd;
231    use risingwave_common::util::epoch::test_epoch;
232    use risingwave_expr::expr::{self, Expression, ValueImpl};
233
234    use super::*;
235    use crate::executor::test_utils::expr::build_from_pretty;
236    use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
237
238    #[tokio::test]
239    async fn test_projection() {
240        let chunk1 = StreamChunk::from_pretty(
241            " I I
242            + 1 4
243            + 2 5
244            + 3 6",
245        );
246        let chunk2 = StreamChunk::from_pretty(
247            " I I
248            + 7 8
249            - 3 6",
250        );
251        let schema = Schema {
252            fields: vec![
253                Field::unnamed(DataType::Int64),
254                Field::unnamed(DataType::Int64),
255            ],
256        };
257        let pk_indices = vec![0];
258        let (mut tx, source) = MockSource::channel();
259        let source = source.into_executor(schema, pk_indices);
260
261        let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)");
262
263        let proj = ProjectExecutor::new(
264            ActorContext::for_test(123),
265            source,
266            vec![test_expr],
267            MultiMap::new(),
268            vec![],
269            false,
270        );
271        let mut proj = proj.boxed().execute();
272
273        tx.push_barrier(test_epoch(1), false);
274        let barrier = proj.next().await.unwrap().unwrap();
275        barrier.as_barrier().unwrap();
276
277        tx.push_chunk(chunk1);
278        tx.push_chunk(chunk2);
279
280        let msg = proj.next().await.unwrap().unwrap();
281        assert_eq!(
282            *msg.as_chunk().unwrap(),
283            StreamChunk::from_pretty(
284                " I
285                + 5
286                + 7
287                + 9"
288            )
289        );
290
291        let msg = proj.next().await.unwrap().unwrap();
292        assert_eq!(
293            *msg.as_chunk().unwrap(),
294            StreamChunk::from_pretty(
295                "  I
296                + 15
297                -  9"
298            )
299        );
300
301        tx.push_barrier(test_epoch(2), true);
302        assert!(proj.next().await.unwrap().unwrap().is_stop());
303    }
304
305    static DUMMY_COUNTER: AtomicI64 = AtomicI64::new(0);
306
307    #[derive(Debug)]
308    struct DummyNondecreasingExpr;
309
310    #[async_trait::async_trait]
311    impl Expression for DummyNondecreasingExpr {
312        fn return_type(&self) -> DataType {
313            DataType::Int64
314        }
315
316        async fn eval_v2(&self, input: &DataChunk) -> expr::Result<ValueImpl> {
317            let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
318            Ok(ValueImpl::Scalar {
319                value: Some(value.into()),
320                capacity: input.capacity(),
321            })
322        }
323
324        async fn eval_row(&self, _input: &OwnedRow) -> expr::Result<Datum> {
325            let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
326            Ok(Some(value.into()))
327        }
328    }
329
330    #[tokio::test]
331    async fn test_watermark_projection() {
332        let schema = Schema {
333            fields: vec![
334                Field::unnamed(DataType::Int64),
335                Field::unnamed(DataType::Int64),
336            ],
337        };
338        let (mut tx, source) = MockSource::channel();
339        let source = source.into_executor(schema, PkIndices::new());
340
341        let a_expr = build_from_pretty("(add:int8 $0:int8 1:int8)");
342        let b_expr = build_from_pretty("(subtract:int8 $0:int8 1:int8)");
343        let c_expr = NonStrictExpression::for_test(DummyNondecreasingExpr);
344
345        let proj = ProjectExecutor::new(
346            ActorContext::for_test(123),
347            source,
348            vec![a_expr, b_expr, c_expr],
349            MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()),
350            vec![2],
351            false,
352        );
353        let mut proj = proj.boxed().execute();
354
355        tx.push_barrier(test_epoch(1), false);
356        tx.push_int64_watermark(0, 100);
357
358        proj.expect_barrier().await;
359        let w1 = proj.expect_watermark().await;
360        let w2 = proj.expect_watermark().await;
361        let (w1, w2) = if w1.col_idx < w2.col_idx {
362            (w1, w2)
363        } else {
364            (w2, w1)
365        };
366
367        assert_eq!(
368            w1,
369            Watermark {
370                col_idx: 0,
371                data_type: DataType::Int64,
372                val: ScalarImpl::Int64(101)
373            }
374        );
375        assert_eq!(
376            w2,
377            Watermark {
378                col_idx: 1,
379                data_type: DataType::Int64,
380                val: ScalarImpl::Int64(99)
381            }
382        );
383
384        // just push some random chunks
385        tx.push_chunk(StreamChunk::from_pretty(
386            "   I I
387            + 120 4
388            + 146 5
389            + 133 6",
390        ));
391        proj.expect_chunk().await;
392        tx.push_chunk(StreamChunk::from_pretty(
393            "   I I
394            + 213 8
395            - 133 6",
396        ));
397        proj.expect_chunk().await;
398
399        tx.push_barrier(test_epoch(2), false);
400        let w3 = proj.expect_watermark().await;
401        proj.expect_barrier().await;
402
403        tx.push_chunk(StreamChunk::from_pretty(
404            "   I I
405            + 100 3
406            + 104 5
407            + 187 3",
408        ));
409        proj.expect_chunk().await;
410
411        tx.push_barrier(test_epoch(3), false);
412        let w4 = proj.expect_watermark().await;
413        proj.expect_barrier().await;
414
415        assert_eq!(w3.col_idx, w4.col_idx);
416        assert!(w3.val.default_cmp(&w4.val).is_le());
417
418        tx.push_int64_watermark(1, 100);
419        tx.push_barrier(test_epoch(4), true);
420
421        assert!(proj.next().await.unwrap().unwrap().is_stop());
422    }
423}