risingwave_stream/executor/project/
project_scalar.rs1use multimap::MultiMap;
16use risingwave_common::row::RowExt;
17use risingwave_common::types::ToOwnedDatum;
18use risingwave_common::util::iter_util::ZipEqFast;
19use risingwave_expr::expr::NonStrictExpression;
20
21use crate::executor::prelude::*;
22
23pub struct ProjectExecutor {
27 input: Executor,
28 inner: Inner,
29}
30
31struct Inner {
32 _ctx: ActorContextRef,
33
34 exprs: Vec<NonStrictExpression>,
36 watermark_derivations: MultiMap<usize, usize>,
39 nondecreasing_expr_indices: Vec<usize>,
41 last_nondec_expr_values: Vec<Option<ScalarImpl>>,
43 is_paused: bool,
45
46 eliminate_noop_updates: bool,
49}
50
51impl ProjectExecutor {
52 pub fn new(
53 ctx: ActorContextRef,
54 input: Executor,
55 exprs: Vec<NonStrictExpression>,
56 watermark_derivations: MultiMap<usize, usize>,
57 nondecreasing_expr_indices: Vec<usize>,
58 noop_update_hint: bool,
59 ) -> Self {
60 let n_nondecreasing_exprs = nondecreasing_expr_indices.len();
61 let eliminate_noop_updates = noop_update_hint
62 || ctx
63 .streaming_config
64 .developer
65 .aggressive_noop_update_elimination;
66 Self {
67 input,
68 inner: Inner {
69 _ctx: ctx,
70 exprs,
71 watermark_derivations,
72 nondecreasing_expr_indices,
73 last_nondec_expr_values: vec![None; n_nondecreasing_exprs],
74 is_paused: false,
75 eliminate_noop_updates,
76 },
77 }
78 }
79}
80
81impl Debug for ProjectExecutor {
82 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
83 f.debug_struct("ProjectExecutor")
84 .field("exprs", &self.inner.exprs)
85 .finish()
86 }
87}
88
89impl Execute for ProjectExecutor {
90 fn execute(self: Box<Self>) -> BoxedMessageStream {
91 self.inner.execute(self.input).boxed()
92 }
93}
94
95pub async fn apply_project_exprs(
96 exprs: &[NonStrictExpression],
97 chunk: StreamChunk,
98) -> StreamExecutorResult<StreamChunk> {
99 let (data_chunk, ops) = chunk.into_parts();
100 let mut projected_columns = Vec::new();
101
102 for expr in exprs {
103 let evaluated_expr = expr.eval_infallible(&data_chunk).await;
104 projected_columns.push(evaluated_expr);
105 }
106 let (_, vis) = data_chunk.into_parts();
107
108 let new_chunk = StreamChunk::with_visibility(ops, projected_columns, vis);
109
110 Ok(new_chunk)
111}
112
113impl Inner {
114 async fn map_filter_chunk(
115 &self,
116 chunk: StreamChunk,
117 ) -> StreamExecutorResult<Option<StreamChunk>> {
118 let mut new_chunk = apply_project_exprs(&self.exprs, chunk).await?;
119 if self.eliminate_noop_updates {
120 new_chunk = new_chunk.eliminate_adjacent_noop_update();
121 }
122 Ok(Some(new_chunk))
123 }
124
125 async fn handle_watermark(&self, watermark: Watermark) -> StreamExecutorResult<Vec<Watermark>> {
126 let out_col_indices = match self.watermark_derivations.get_vec(&watermark.col_idx) {
127 Some(v) => v,
128 None => return Ok(vec![]),
129 };
130 let mut ret = vec![];
131 for out_col_idx in out_col_indices {
132 let out_col_idx = *out_col_idx;
133 let derived_watermark = watermark
134 .clone()
135 .transform_with_expr(&self.exprs[out_col_idx], out_col_idx)
136 .await;
137 if let Some(derived_watermark) = derived_watermark {
138 ret.push(derived_watermark);
139 } else {
140 warn!(
141 "a NULL watermark is derived with the expression {}!",
142 out_col_idx
143 );
144 }
145 }
146 Ok(ret)
147 }
148
149 #[try_stream(ok = Message, error = StreamExecutorError)]
150 async fn execute(mut self, input: Executor) {
151 let mut input = input.execute();
152 let first_barrier = expect_first_barrier(&mut input).await?;
153 self.is_paused = first_barrier.is_pause_on_startup();
154 yield Message::Barrier(first_barrier);
155
156 #[for_await]
157 for msg in input {
158 let msg = msg?;
159 match msg {
160 Message::Watermark(w) => {
161 let watermarks = self.handle_watermark(w).await?;
162 for watermark in watermarks {
163 yield Message::Watermark(watermark)
164 }
165 }
166 Message::Chunk(chunk) => match self.map_filter_chunk(chunk).await? {
167 Some(new_chunk) => {
168 if !self.nondecreasing_expr_indices.is_empty()
169 && let Some((_, first_visible_row)) = new_chunk.rows().next()
170 {
171 first_visible_row
173 .project(&self.nondecreasing_expr_indices)
174 .iter()
175 .enumerate()
176 .for_each(|(idx, value)| {
177 self.last_nondec_expr_values[idx] =
178 Some(value.to_owned_datum().expect(
179 "non-decreasing expression should never be NULL",
180 ));
181 });
182 }
183 yield Message::Chunk(new_chunk)
184 }
185 None => continue,
186 },
187 Message::Barrier(barrier) => {
188 if !self.is_paused {
189 for (&expr_idx, value) in self
190 .nondecreasing_expr_indices
191 .iter()
192 .zip_eq_fast(&mut self.last_nondec_expr_values)
193 {
194 if let Some(value) = std::mem::take(value) {
195 yield Message::Watermark(Watermark::new(
196 expr_idx,
197 self.exprs[expr_idx].return_type(),
198 value,
199 ))
200 }
201 }
202 }
203
204 if let Some(mutation) = barrier.mutation.as_deref() {
205 match mutation {
206 Mutation::Pause => {
207 self.is_paused = true;
208 }
209 Mutation::Resume => {
210 self.is_paused = false;
211 }
212 _ => (),
213 }
214 }
215
216 yield Message::Barrier(barrier);
217 }
218 }
219 }
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use std::sync::atomic::{self, AtomicI64};
226
227 use risingwave_common::array::DataChunk;
228 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
229 use risingwave_common::catalog::Field;
230 use risingwave_common::types::DefaultOrd;
231 use risingwave_common::util::epoch::test_epoch;
232 use risingwave_expr::expr::{self, Expression, ValueImpl};
233
234 use super::*;
235 use crate::executor::test_utils::expr::build_from_pretty;
236 use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
237
238 #[tokio::test]
239 async fn test_projection() {
240 let chunk1 = StreamChunk::from_pretty(
241 " I I
242 + 1 4
243 + 2 5
244 + 3 6",
245 );
246 let chunk2 = StreamChunk::from_pretty(
247 " I I
248 + 7 8
249 - 3 6",
250 );
251 let schema = Schema {
252 fields: vec![
253 Field::unnamed(DataType::Int64),
254 Field::unnamed(DataType::Int64),
255 ],
256 };
257 let pk_indices = vec![0];
258 let (mut tx, source) = MockSource::channel();
259 let source = source.into_executor(schema, pk_indices);
260
261 let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)");
262
263 let proj = ProjectExecutor::new(
264 ActorContext::for_test(123),
265 source,
266 vec![test_expr],
267 MultiMap::new(),
268 vec![],
269 false,
270 );
271 let mut proj = proj.boxed().execute();
272
273 tx.push_barrier(test_epoch(1), false);
274 let barrier = proj.next().await.unwrap().unwrap();
275 barrier.as_barrier().unwrap();
276
277 tx.push_chunk(chunk1);
278 tx.push_chunk(chunk2);
279
280 let msg = proj.next().await.unwrap().unwrap();
281 assert_eq!(
282 *msg.as_chunk().unwrap(),
283 StreamChunk::from_pretty(
284 " I
285 + 5
286 + 7
287 + 9"
288 )
289 );
290
291 let msg = proj.next().await.unwrap().unwrap();
292 assert_eq!(
293 *msg.as_chunk().unwrap(),
294 StreamChunk::from_pretty(
295 " I
296 + 15
297 - 9"
298 )
299 );
300
301 tx.push_barrier(test_epoch(2), true);
302 assert!(proj.next().await.unwrap().unwrap().is_stop());
303 }
304
305 static DUMMY_COUNTER: AtomicI64 = AtomicI64::new(0);
306
307 #[derive(Debug)]
308 struct DummyNondecreasingExpr;
309
310 #[async_trait::async_trait]
311 impl Expression for DummyNondecreasingExpr {
312 fn return_type(&self) -> DataType {
313 DataType::Int64
314 }
315
316 async fn eval_v2(&self, input: &DataChunk) -> expr::Result<ValueImpl> {
317 let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
318 Ok(ValueImpl::Scalar {
319 value: Some(value.into()),
320 capacity: input.capacity(),
321 })
322 }
323
324 async fn eval_row(&self, _input: &OwnedRow) -> expr::Result<Datum> {
325 let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
326 Ok(Some(value.into()))
327 }
328 }
329
330 #[tokio::test]
331 async fn test_watermark_projection() {
332 let schema = Schema {
333 fields: vec![
334 Field::unnamed(DataType::Int64),
335 Field::unnamed(DataType::Int64),
336 ],
337 };
338 let (mut tx, source) = MockSource::channel();
339 let source = source.into_executor(schema, PkIndices::new());
340
341 let a_expr = build_from_pretty("(add:int8 $0:int8 1:int8)");
342 let b_expr = build_from_pretty("(subtract:int8 $0:int8 1:int8)");
343 let c_expr = NonStrictExpression::for_test(DummyNondecreasingExpr);
344
345 let proj = ProjectExecutor::new(
346 ActorContext::for_test(123),
347 source,
348 vec![a_expr, b_expr, c_expr],
349 MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()),
350 vec![2],
351 false,
352 );
353 let mut proj = proj.boxed().execute();
354
355 tx.push_barrier(test_epoch(1), false);
356 tx.push_int64_watermark(0, 100);
357
358 proj.expect_barrier().await;
359 let w1 = proj.expect_watermark().await;
360 let w2 = proj.expect_watermark().await;
361 let (w1, w2) = if w1.col_idx < w2.col_idx {
362 (w1, w2)
363 } else {
364 (w2, w1)
365 };
366
367 assert_eq!(
368 w1,
369 Watermark {
370 col_idx: 0,
371 data_type: DataType::Int64,
372 val: ScalarImpl::Int64(101)
373 }
374 );
375 assert_eq!(
376 w2,
377 Watermark {
378 col_idx: 1,
379 data_type: DataType::Int64,
380 val: ScalarImpl::Int64(99)
381 }
382 );
383
384 tx.push_chunk(StreamChunk::from_pretty(
386 " I I
387 + 120 4
388 + 146 5
389 + 133 6",
390 ));
391 proj.expect_chunk().await;
392 tx.push_chunk(StreamChunk::from_pretty(
393 " I I
394 + 213 8
395 - 133 6",
396 ));
397 proj.expect_chunk().await;
398
399 tx.push_barrier(test_epoch(2), false);
400 let w3 = proj.expect_watermark().await;
401 proj.expect_barrier().await;
402
403 tx.push_chunk(StreamChunk::from_pretty(
404 " I I
405 + 100 3
406 + 104 5
407 + 187 3",
408 ));
409 proj.expect_chunk().await;
410
411 tx.push_barrier(test_epoch(3), false);
412 let w4 = proj.expect_watermark().await;
413 proj.expect_barrier().await;
414
415 assert_eq!(w3.col_idx, w4.col_idx);
416 assert!(w3.val.default_cmp(&w4.val).is_le());
417
418 tx.push_int64_watermark(1, 100);
419 tx.push_barrier(test_epoch(4), true);
420
421 assert!(proj.next().await.unwrap().unwrap().is_stop());
422 }
423}