risingwave_stream/executor/project/
materialized_exprs.rs1use risingwave_common::array::{Op, RowRef};
16use risingwave_common::bitmap::BitmapBuilder;
17use risingwave_common::row::RowExt;
18use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
19use risingwave_expr::expr::NonStrictExpression;
20
21use crate::cache::ManagedLruCache;
22use crate::common::metrics::MetricsInfo;
23use crate::consistency::consistency_panic;
24use crate::executor::prelude::*;
25
26pub struct MaterializedExprsExecutor<S: StateStore> {
35 input: Executor,
36 inner: Inner<S>,
37}
38
39pub struct MaterializedExprsArgs<S: StateStore> {
40 pub actor_ctx: ActorContextRef,
41 pub input: Executor,
42 pub exprs: Vec<NonStrictExpression>,
43 pub state_table: StateTable<S>,
44 pub state_clean_col_idx: Option<usize>,
45 pub watermark_epoch: AtomicU64Ref,
46}
47
48impl<S: StateStore> MaterializedExprsExecutor<S> {
49 pub fn new(args: MaterializedExprsArgs<S>) -> Self {
50 let state_table_pk_indices = args.state_table.pk_indices().to_vec();
51 Self {
52 input: args.input,
53 inner: Inner {
54 actor_ctx: args.actor_ctx.clone(),
55 exprs: args.exprs,
56 state_table: StateTableWrapper::new(
57 args.state_table,
58 args.actor_ctx.clone(),
59 args.watermark_epoch,
60 ),
61 state_table_pk_indices,
62 state_clean_col_idx: args.state_clean_col_idx,
63 },
64 }
65 }
66}
67
68impl<S: StateStore> Execute for MaterializedExprsExecutor<S> {
69 fn execute(self: Box<Self>) -> BoxedMessageStream {
70 self.inner.execute(self.input).boxed()
71 }
72}
73
74struct StateTableWrapper<S: StateStore> {
75 inner: StateTable<S>,
76 cache: ManagedLruCache<OwnedRow, Option<OwnedRow>>,
77}
78
79impl<S: StateStore> StateTableWrapper<S> {
80 fn new(
81 table: StateTable<S>,
82 actor_ctx: ActorContextRef,
83 watermark_epoch: AtomicU64Ref,
84 ) -> Self {
85 let metrics_info = MetricsInfo::new(
86 actor_ctx.streaming_metrics.clone(),
87 table.table_id(),
88 actor_ctx.id,
89 "MaterializedExprs",
90 );
91
92 Self {
93 inner: table,
94 cache: ManagedLruCache::unbounded(watermark_epoch, metrics_info),
95 }
96 }
97
98 fn insert(&mut self, row: impl Row) {
99 let owned_row = row.into_owned_row();
100 let pk = (&owned_row)
101 .project(self.inner.pk_indices())
102 .into_owned_row();
103
104 self.inner.insert(&owned_row);
106 self.cache.put(pk, Some(owned_row));
107 }
108
109 async fn remove_by_pk(&mut self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
110 let pk_owned = pk.into_owned_row();
112 if let Some(row) = self.cache.get(&pk_owned) {
113 if let Some(row) = row {
115 let cloned_row = row.clone();
116 self.inner.delete(row);
117 self.cache.put(pk_owned, None);
119 Ok(Some(cloned_row))
120 } else {
121 Ok(None)
122 }
123 } else {
124 let row = self.inner.get_row(pk_owned).await?;
126 if let Some(ref row) = row {
127 self.inner.delete(row);
128 }
129 Ok(row)
130 }
131 }
132}
133
134struct Inner<S: StateStore> {
135 actor_ctx: ActorContextRef,
136 exprs: Vec<NonStrictExpression>,
138 state_table: StateTableWrapper<S>,
140 state_table_pk_indices: Vec<usize>,
142 state_clean_col_idx: Option<usize>,
144}
145
146impl<S: StateStore> Inner<S> {
147 #[try_stream(ok = Message, error = StreamExecutorError)]
148 async fn execute(mut self, input: Executor) {
149 let mut input = input.execute();
150 let first_barrier = expect_first_barrier(&mut input).await?;
151 let first_epoch = first_barrier.epoch;
152 yield Message::Barrier(first_barrier);
153 self.state_table.inner.init_epoch(first_epoch).await?;
154
155 #[for_await]
156 for msg in input {
157 let msg = msg?;
158 match msg {
159 Message::Chunk(input_chunk) => {
160 let mut eval_visibility = BitmapBuilder::from(input_chunk.visibility().clone());
161 for (i, op) in input_chunk.ops().iter().enumerate() {
162 match op {
164 Op::Delete | Op::UpdateDelete => eval_visibility.set(i, false),
165 _ => {}
166 }
167 }
168 let eval_chunk = input_chunk
169 .data_chunk()
170 .with_visibility(eval_visibility.finish());
171
172 let mut eval_result_arrs = Vec::with_capacity(self.exprs.len());
173 for expr in &self.exprs {
174 eval_result_arrs.push(expr.eval_infallible(&eval_chunk).await);
176 }
177
178 let mut eval_result_builders = eval_result_arrs
179 .iter()
180 .map(|arr| arr.create_builder(input_chunk.capacity()))
181 .collect::<Vec<_>>();
182 for (row_idx, row_op) in input_chunk.rows_with_holes().enumerate() {
184 let Some((op, row)) = row_op else {
185 for builder in &mut eval_result_builders {
187 builder.append_null();
188 }
189 continue;
190 };
191
192 match op {
193 Op::Insert | Op::UpdateInsert => {
194 for (arr, builder) in eval_result_arrs
196 .iter()
197 .zip_eq_fast(&mut eval_result_builders)
198 {
199 let datum_ref = unsafe { arr.value_at_unchecked(row_idx) };
200 builder.append(datum_ref);
201 }
202
203 self.state_table.insert(
204 row.chain(RowRef::with_columns(&eval_result_arrs, row_idx)),
205 );
206 }
207 Op::Delete | Op::UpdateDelete => {
208 let pk = row.project(&self.state_table_pk_indices);
210 let old_row = self.state_table.remove_by_pk(pk).await?;
211 if let Some(old_row) = old_row {
212 for (datum_ref, builder) in old_row
213 .iter()
214 .skip(row.len())
215 .zip_eq_debug(&mut eval_result_builders)
216 {
217 builder.append(datum_ref);
218 }
219 } else {
220 consistency_panic!("delete non-existing row");
221 for builder in &mut eval_result_builders {
222 builder.append_null();
223 }
224 }
225 }
226 }
227 }
228
229 let (ops, mut columns, vis) = input_chunk.into_inner();
230 columns.extend(
231 eval_result_builders
232 .into_iter()
233 .map(|builder| builder.finish().into()),
234 );
235 yield Message::Chunk(StreamChunk::with_visibility(ops, columns, vis));
236 }
237 Message::Barrier(barrier) => {
238 let post_commit = self.state_table.inner.commit(barrier.epoch).await?;
239 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
240 yield Message::Barrier(barrier);
241
242 self.state_table.cache.evict();
244
245 if let Some((_, cache_may_stale)) =
246 post_commit.post_yield_barrier(update_vnode_bitmap).await?
247 {
248 if cache_may_stale {
249 self.state_table.cache.clear();
250 }
251 }
252 }
253 Message::Watermark(watermark) => {
254 if let Some(state_clean_col_idx) = self.state_clean_col_idx
255 && state_clean_col_idx == watermark.col_idx
256 {
257 self.state_table
258 .inner
259 .update_watermark(watermark.val.clone());
260 }
261 yield Message::Watermark(watermark);
262 }
263 }
264 }
265 }
266}