risingwave_stream/executor/project/
materialized_exprs.rs1use risingwave_common::array::{Op, RowRef};
16use risingwave_common::bitmap::BitmapBuilder;
17use risingwave_common::row::RowExt;
18use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
19use risingwave_expr::expr::NonStrictExpression;
20
21use crate::consistency::consistency_panic;
22use crate::executor::prelude::*;
23
24pub struct MaterializedExprsExecutor<S: StateStore> {
33 input: Executor,
34 inner: Inner<S>,
35}
36
37pub struct MaterializedExprsArgs<S: StateStore> {
38 pub actor_ctx: ActorContextRef,
39 pub input: Executor,
40 pub exprs: Vec<NonStrictExpression>,
41 pub state_table: StateTable<S>,
42 pub state_clean_col_idx: Option<usize>,
43}
44
45impl<S: StateStore> MaterializedExprsExecutor<S> {
46 pub fn new(args: MaterializedExprsArgs<S>) -> Self {
47 let state_table_pk_indices = args.state_table.pk_indices().to_vec();
48 Self {
49 input: args.input,
50 inner: Inner {
51 actor_ctx: args.actor_ctx,
52 exprs: args.exprs,
53 state_table: StateTableWrapper::new(args.state_table),
54 state_table_pk_indices,
55 state_clean_col_idx: args.state_clean_col_idx,
56 },
57 }
58 }
59}
60
61impl<S: StateStore> Execute for MaterializedExprsExecutor<S> {
62 fn execute(self: Box<Self>) -> BoxedMessageStream {
63 self.inner.execute(self.input).boxed()
64 }
65}
66
67struct StateTableWrapper<S: StateStore> {
68 inner: StateTable<S>,
69 }
71
72impl<S: StateStore> StateTableWrapper<S> {
73 fn new(table: StateTable<S>) -> Self {
74 Self { inner: table }
75 }
76
77 fn insert(&mut self, row: impl Row) {
78 self.inner.insert(row);
79 }
80
81 async fn remove_by_pk(&mut self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
82 let row = self.inner.get_row(pk).await?;
83 if let Some(ref row) = row {
84 self.inner.delete(row);
85 }
86 Ok(row)
87 }
88}
89
90struct Inner<S: StateStore> {
91 actor_ctx: ActorContextRef,
92 exprs: Vec<NonStrictExpression>,
94 state_table: StateTableWrapper<S>,
96 state_table_pk_indices: Vec<usize>,
98 state_clean_col_idx: Option<usize>,
100}
101
102impl<S: StateStore> Inner<S> {
103 #[try_stream(ok = Message, error = StreamExecutorError)]
104 async fn execute(mut self, input: Executor) {
105 let mut input = input.execute();
106 let first_barrier = expect_first_barrier(&mut input).await?;
107 let first_epoch = first_barrier.epoch;
108 yield Message::Barrier(first_barrier);
109 self.state_table.inner.init_epoch(first_epoch).await?;
110
111 #[for_await]
112 for msg in input {
113 let msg = msg?;
114 match msg {
115 Message::Chunk(input_chunk) => {
116 let mut eval_visibility = BitmapBuilder::from(input_chunk.visibility().clone());
117 for (i, op) in input_chunk.ops().iter().enumerate() {
118 match op {
120 Op::Delete | Op::UpdateDelete => eval_visibility.set(i, false),
121 _ => {}
122 }
123 }
124 let eval_chunk = input_chunk
125 .data_chunk()
126 .with_visibility(eval_visibility.finish());
127
128 let mut eval_result_arrs = Vec::with_capacity(self.exprs.len());
129 for expr in &self.exprs {
130 eval_result_arrs.push(expr.eval_infallible(&eval_chunk).await);
132 }
133
134 let mut eval_result_builders = eval_result_arrs
135 .iter()
136 .map(|arr| arr.create_builder(input_chunk.capacity()))
137 .collect::<Vec<_>>();
138 for (row_idx, row_op) in input_chunk.rows_with_holes().enumerate() {
140 let Some((op, row)) = row_op else {
141 for builder in &mut eval_result_builders {
143 builder.append_null();
144 }
145 continue;
146 };
147
148 match op {
149 Op::Insert | Op::UpdateInsert => {
150 for (arr, builder) in eval_result_arrs
152 .iter()
153 .zip_eq_fast(&mut eval_result_builders)
154 {
155 let datum_ref = unsafe { arr.value_at_unchecked(row_idx) };
156 builder.append(datum_ref);
157 }
158
159 self.state_table.insert(
160 row.chain(RowRef::with_columns(&eval_result_arrs, row_idx)),
161 );
162 }
163 Op::Delete | Op::UpdateDelete => {
164 let pk = row.project(&self.state_table_pk_indices);
166 let old_row = self.state_table.remove_by_pk(pk).await?;
167 if let Some(old_row) = old_row {
168 for (datum_ref, builder) in old_row
169 .iter()
170 .skip(row.len())
171 .zip_eq_debug(&mut eval_result_builders)
172 {
173 builder.append(datum_ref);
174 }
175 } else {
176 consistency_panic!("delete non-existing row");
177 for builder in &mut eval_result_builders {
178 builder.append_null();
179 }
180 }
181 }
182 }
183 }
184
185 let (ops, mut columns, vis) = input_chunk.into_inner();
186 columns.extend(
187 eval_result_builders
188 .into_iter()
189 .map(|builder| builder.finish().into()),
190 );
191 yield Message::Chunk(StreamChunk::with_visibility(ops, columns, vis));
192 }
193 Message::Barrier(barrier) => {
194 let post_commit = self.state_table.inner.commit(barrier.epoch).await?;
195 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
196 yield Message::Barrier(barrier);
197 post_commit.post_yield_barrier(update_vnode_bitmap).await?;
198 }
199 Message::Watermark(watermark) => {
200 if let Some(state_clean_col_idx) = self.state_clean_col_idx
201 && state_clean_col_idx == watermark.col_idx
202 {
203 self.state_table
204 .inner
205 .update_watermark(watermark.val.clone());
206 }
207 yield Message::Watermark(watermark);
208 }
209 }
210 }
211 }
212}