risingwave_stream/executor/project/
materialized_exprs.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::{Op, RowRef};
16use risingwave_common::bitmap::BitmapBuilder;
17use risingwave_common::row::RowExt;
18use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
19use risingwave_expr::expr::NonStrictExpression;
20
21use crate::consistency::consistency_panic;
22use crate::executor::prelude::*;
23
24/// An executor that materializes the result of a set of expressions.
25/// The expressions are evaluated on `Insert`/`UpdateInsert` rows and the results are stored in a state table.
26/// When a `Delete`/`UpdateDelete` row is received, the corresponding result row is popped from the state table
27/// without the need to re-evaluate the expressions.
28///
29/// - Executor output: `input | expression results`, PK is inherited from the input.
30/// - State table: `input | expression results`.
31/// - State table PK: `state clean column | rest of input pk`.
32pub struct MaterializedExprsExecutor<S: StateStore> {
33    input: Executor,
34    inner: Inner<S>,
35}
36
37pub struct MaterializedExprsArgs<S: StateStore> {
38    pub actor_ctx: ActorContextRef,
39    pub input: Executor,
40    pub exprs: Vec<NonStrictExpression>,
41    pub state_table: StateTable<S>,
42    pub state_clean_col_idx: Option<usize>,
43}
44
45impl<S: StateStore> MaterializedExprsExecutor<S> {
46    pub fn new(args: MaterializedExprsArgs<S>) -> Self {
47        let state_table_pk_indices = args.state_table.pk_indices().to_vec();
48        Self {
49            input: args.input,
50            inner: Inner {
51                actor_ctx: args.actor_ctx,
52                exprs: args.exprs,
53                state_table: StateTableWrapper::new(args.state_table),
54                state_table_pk_indices,
55                state_clean_col_idx: args.state_clean_col_idx,
56            },
57        }
58    }
59}
60
61impl<S: StateStore> Execute for MaterializedExprsExecutor<S> {
62    fn execute(self: Box<Self>) -> BoxedMessageStream {
63        self.inner.execute(self.input).boxed()
64    }
65}
66
67struct StateTableWrapper<S: StateStore> {
68    inner: StateTable<S>,
69    // TODO(rc): lru cache
70}
71
72impl<S: StateStore> StateTableWrapper<S> {
73    fn new(table: StateTable<S>) -> Self {
74        Self { inner: table }
75    }
76
77    fn insert(&mut self, row: impl Row) {
78        self.inner.insert(row);
79    }
80
81    async fn remove_by_pk(&mut self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
82        let row = self.inner.get_row(pk).await?;
83        if let Some(ref row) = row {
84            self.inner.delete(row);
85        }
86        Ok(row)
87    }
88}
89
90struct Inner<S: StateStore> {
91    actor_ctx: ActorContextRef,
92    /// Expressions to evaluate.
93    exprs: Vec<NonStrictExpression>,
94    /// State table to store the results.
95    state_table: StateTableWrapper<S>,
96    /// State table PK indices.
97    state_table_pk_indices: Vec<usize>,
98    /// Index of the column used for state cleaning.
99    state_clean_col_idx: Option<usize>,
100}
101
102impl<S: StateStore> Inner<S> {
103    #[try_stream(ok = Message, error = StreamExecutorError)]
104    async fn execute(mut self, input: Executor) {
105        let mut input = input.execute();
106        let first_barrier = expect_first_barrier(&mut input).await?;
107        let first_epoch = first_barrier.epoch;
108        yield Message::Barrier(first_barrier);
109        self.state_table.inner.init_epoch(first_epoch).await?;
110
111        #[for_await]
112        for msg in input {
113            let msg = msg?;
114            match msg {
115                Message::Chunk(input_chunk) => {
116                    let mut eval_visibility = BitmapBuilder::from(input_chunk.visibility().clone());
117                    for (i, op) in input_chunk.ops().iter().enumerate() {
118                        // hide deletions from expression evaluation
119                        match op {
120                            Op::Delete | Op::UpdateDelete => eval_visibility.set(i, false),
121                            _ => {}
122                        }
123                    }
124                    let eval_chunk = input_chunk
125                        .data_chunk()
126                        .with_visibility(eval_visibility.finish());
127
128                    let mut eval_result_arrs = Vec::with_capacity(self.exprs.len());
129                    for expr in &self.exprs {
130                        // for deletions, the evaluation result is NULL
131                        eval_result_arrs.push(expr.eval_infallible(&eval_chunk).await);
132                    }
133
134                    let mut eval_result_builders = eval_result_arrs
135                        .iter()
136                        .map(|arr| arr.create_builder(input_chunk.capacity()))
137                        .collect::<Vec<_>>();
138                    // now we need to replace the NULLs with the old evaluation results
139                    for (row_idx, row_op) in input_chunk.rows_with_holes().enumerate() {
140                        let Some((op, row)) = row_op else {
141                            // it's invisible in the input
142                            for builder in &mut eval_result_builders {
143                                builder.append_null();
144                            }
145                            continue;
146                        };
147
148                        match op {
149                            Op::Insert | Op::UpdateInsert => {
150                                // for insertions, append the evaluation results
151                                for (arr, builder) in eval_result_arrs
152                                    .iter()
153                                    .zip_eq_fast(&mut eval_result_builders)
154                                {
155                                    let datum_ref = unsafe { arr.value_at_unchecked(row_idx) };
156                                    builder.append(datum_ref);
157                                }
158
159                                self.state_table.insert(
160                                    row.chain(RowRef::with_columns(&eval_result_arrs, row_idx)),
161                                );
162                            }
163                            Op::Delete | Op::UpdateDelete => {
164                                // for deletions, append the old evaluation results
165                                let pk = row.project(&self.state_table_pk_indices);
166                                let old_row = self.state_table.remove_by_pk(pk).await?;
167                                if let Some(old_row) = old_row {
168                                    for (datum_ref, builder) in old_row
169                                        .iter()
170                                        .skip(row.len())
171                                        .zip_eq_debug(&mut eval_result_builders)
172                                    {
173                                        builder.append(datum_ref);
174                                    }
175                                } else {
176                                    consistency_panic!("delete non-existing row");
177                                    for builder in &mut eval_result_builders {
178                                        builder.append_null();
179                                    }
180                                }
181                            }
182                        }
183                    }
184
185                    let (ops, mut columns, vis) = input_chunk.into_inner();
186                    columns.extend(
187                        eval_result_builders
188                            .into_iter()
189                            .map(|builder| builder.finish().into()),
190                    );
191                    yield Message::Chunk(StreamChunk::with_visibility(ops, columns, vis));
192                }
193                Message::Barrier(barrier) => {
194                    let post_commit = self.state_table.inner.commit(barrier.epoch).await?;
195                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
196                    yield Message::Barrier(barrier);
197                    post_commit.post_yield_barrier(update_vnode_bitmap).await?;
198                }
199                Message::Watermark(watermark) => {
200                    if let Some(state_clean_col_idx) = self.state_clean_col_idx
201                        && state_clean_col_idx == watermark.col_idx
202                    {
203                        self.state_table
204                            .inner
205                            .update_watermark(watermark.val.clone());
206                    }
207                    yield Message::Watermark(watermark);
208                }
209            }
210        }
211    }
212}