risingwave_stream/executor/project/
materialized_exprs.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::{Op, RowRef};
16use risingwave_common::bitmap::BitmapBuilder;
17use risingwave_common::row::RowExt;
18use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
19use risingwave_expr::expr::NonStrictExpression;
20
21use crate::cache::ManagedLruCache;
22use crate::common::metrics::MetricsInfo;
23use crate::consistency::consistency_panic;
24use crate::executor::prelude::*;
25
26/// An executor that materializes the result of a set of expressions.
27/// The expressions are evaluated on `Insert`/`UpdateInsert` rows and the results are stored in a state table.
28/// When a `Delete`/`UpdateDelete` row is received, the corresponding result row is popped from the state table
29/// without the need to re-evaluate the expressions.
30///
31/// - Executor output: `input | expression results`, PK is inherited from the input.
32/// - State table: `input | expression results`.
33/// - State table PK: `state clean column | rest of input pk`.
34pub struct MaterializedExprsExecutor<S: StateStore> {
35    input: Executor,
36    inner: Inner<S>,
37}
38
39pub struct MaterializedExprsArgs<S: StateStore> {
40    pub actor_ctx: ActorContextRef,
41    pub input: Executor,
42    pub exprs: Vec<NonStrictExpression>,
43    pub state_table: StateTable<S>,
44    pub state_clean_col_idx: Option<usize>,
45    pub watermark_epoch: AtomicU64Ref,
46}
47
48impl<S: StateStore> MaterializedExprsExecutor<S> {
49    pub fn new(args: MaterializedExprsArgs<S>) -> Self {
50        let state_table_pk_indices = args.state_table.pk_indices().to_vec();
51        Self {
52            input: args.input,
53            inner: Inner {
54                actor_ctx: args.actor_ctx.clone(),
55                exprs: args.exprs,
56                state_table: StateTableWrapper::new(
57                    args.state_table,
58                    args.actor_ctx.clone(),
59                    args.watermark_epoch,
60                ),
61                state_table_pk_indices,
62                state_clean_col_idx: args.state_clean_col_idx,
63            },
64        }
65    }
66}
67
68impl<S: StateStore> Execute for MaterializedExprsExecutor<S> {
69    fn execute(self: Box<Self>) -> BoxedMessageStream {
70        self.inner.execute(self.input).boxed()
71    }
72}
73
74struct StateTableWrapper<S: StateStore> {
75    inner: StateTable<S>,
76    cache: ManagedLruCache<OwnedRow, Option<OwnedRow>>,
77}
78
79impl<S: StateStore> StateTableWrapper<S> {
80    fn new(
81        table: StateTable<S>,
82        actor_ctx: ActorContextRef,
83        watermark_epoch: AtomicU64Ref,
84    ) -> Self {
85        let metrics_info = MetricsInfo::new(
86            actor_ctx.streaming_metrics.clone(),
87            table.table_id(),
88            actor_ctx.id,
89            "MaterializedExprs",
90        );
91
92        Self {
93            inner: table,
94            cache: ManagedLruCache::unbounded(watermark_epoch, metrics_info),
95        }
96    }
97
98    fn insert(&mut self, row: impl Row) {
99        let owned_row = row.into_owned_row();
100        let pk = (&owned_row)
101            .project(self.inner.pk_indices())
102            .into_owned_row();
103
104        // Store the record and update the cache
105        self.inner.insert(&owned_row);
106        self.cache.put(pk, Some(owned_row));
107    }
108
109    async fn remove_by_pk(&mut self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
110        // Try to get from cache first
111        let pk_owned = pk.into_owned_row();
112        if let Some(row) = self.cache.get(&pk_owned) {
113            // Cache hit, delete from cache and state table
114            if let Some(row) = row {
115                let cloned_row = row.clone();
116                self.inner.delete(row);
117                // Mark as deleted in cache
118                self.cache.put(pk_owned, None);
119                Ok(Some(cloned_row))
120            } else {
121                Ok(None)
122            }
123        } else {
124            // Cache miss, get from state table
125            let row = self.inner.get_row(pk_owned).await?;
126            if let Some(ref row) = row {
127                self.inner.delete(row);
128            }
129            Ok(row)
130        }
131    }
132}
133
134struct Inner<S: StateStore> {
135    actor_ctx: ActorContextRef,
136    /// Expressions to evaluate.
137    exprs: Vec<NonStrictExpression>,
138    /// State table to store the results.
139    state_table: StateTableWrapper<S>,
140    /// State table PK indices.
141    state_table_pk_indices: Vec<usize>,
142    /// Index of the column used for state cleaning.
143    state_clean_col_idx: Option<usize>,
144}
145
146impl<S: StateStore> Inner<S> {
147    #[try_stream(ok = Message, error = StreamExecutorError)]
148    async fn execute(mut self, input: Executor) {
149        let mut input = input.execute();
150        let first_barrier = expect_first_barrier(&mut input).await?;
151        let first_epoch = first_barrier.epoch;
152        yield Message::Barrier(first_barrier);
153        self.state_table.inner.init_epoch(first_epoch).await?;
154
155        #[for_await]
156        for msg in input {
157            let msg = msg?;
158            match msg {
159                Message::Chunk(input_chunk) => {
160                    let mut eval_visibility = BitmapBuilder::from(input_chunk.visibility().clone());
161                    for (i, op) in input_chunk.ops().iter().enumerate() {
162                        // hide deletions from expression evaluation
163                        match op {
164                            Op::Delete | Op::UpdateDelete => eval_visibility.set(i, false),
165                            _ => {}
166                        }
167                    }
168                    let eval_chunk = input_chunk
169                        .data_chunk()
170                        .with_visibility(eval_visibility.finish());
171
172                    let mut eval_result_arrs = Vec::with_capacity(self.exprs.len());
173                    for expr in &self.exprs {
174                        // for deletions, the evaluation result is NULL
175                        eval_result_arrs.push(expr.eval_infallible(&eval_chunk).await);
176                    }
177
178                    let mut eval_result_builders = eval_result_arrs
179                        .iter()
180                        .map(|arr| arr.create_builder(input_chunk.capacity()))
181                        .collect::<Vec<_>>();
182                    // now we need to replace the NULLs with the old evaluation results
183                    for (row_idx, row_op) in input_chunk.rows_with_holes().enumerate() {
184                        let Some((op, row)) = row_op else {
185                            // it's invisible in the input
186                            for builder in &mut eval_result_builders {
187                                builder.append_null();
188                            }
189                            continue;
190                        };
191
192                        match op {
193                            Op::Insert | Op::UpdateInsert => {
194                                // for insertions, append the evaluation results
195                                for (arr, builder) in eval_result_arrs
196                                    .iter()
197                                    .zip_eq_fast(&mut eval_result_builders)
198                                {
199                                    let datum_ref = unsafe { arr.value_at_unchecked(row_idx) };
200                                    builder.append(datum_ref);
201                                }
202
203                                self.state_table.insert(
204                                    row.chain(RowRef::with_columns(&eval_result_arrs, row_idx)),
205                                );
206                            }
207                            Op::Delete | Op::UpdateDelete => {
208                                // for deletions, append the old evaluation results
209                                let pk = row.project(&self.state_table_pk_indices);
210                                let old_row = self.state_table.remove_by_pk(pk).await?;
211                                if let Some(old_row) = old_row {
212                                    for (datum_ref, builder) in old_row
213                                        .iter()
214                                        .skip(row.len())
215                                        .zip_eq_debug(&mut eval_result_builders)
216                                    {
217                                        builder.append(datum_ref);
218                                    }
219                                } else {
220                                    consistency_panic!("delete non-existing row");
221                                    for builder in &mut eval_result_builders {
222                                        builder.append_null();
223                                    }
224                                }
225                            }
226                        }
227                    }
228
229                    let (ops, mut columns, vis) = input_chunk.into_inner();
230                    columns.extend(
231                        eval_result_builders
232                            .into_iter()
233                            .map(|builder| builder.finish().into()),
234                    );
235                    yield Message::Chunk(StreamChunk::with_visibility(ops, columns, vis));
236                }
237                Message::Barrier(barrier) => {
238                    let post_commit = self.state_table.inner.commit(barrier.epoch).await?;
239                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
240                    yield Message::Barrier(barrier);
241
242                    // evict LRU cache
243                    self.state_table.cache.evict();
244
245                    if let Some((_, cache_may_stale)) =
246                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
247                    {
248                        if cache_may_stale {
249                            self.state_table.cache.clear();
250                        }
251                    }
252                }
253                Message::Watermark(watermark) => {
254                    if let Some(state_clean_col_idx) = self.state_clean_col_idx
255                        && state_clean_col_idx == watermark.col_idx
256                    {
257                        self.state_table
258                            .inner
259                            .update_watermark(watermark.val.clone());
260                    }
261                    yield Message::Watermark(watermark);
262                }
263            }
264        }
265    }
266}