risingwave_stream/executor/project/
project_scalar.rs1use multimap::MultiMap;
16use risingwave_common::row::RowExt;
17use risingwave_common::types::ToOwnedDatum;
18use risingwave_common::util::iter_util::ZipEqFast;
19use risingwave_expr::expr::NonStrictExpression;
20
21use crate::executor::prelude::*;
22
23pub struct ProjectExecutor {
27 input: Executor,
28 inner: Inner,
29}
30
31struct Inner {
32 _ctx: ActorContextRef,
33
34 exprs: Vec<NonStrictExpression>,
36 watermark_derivations: MultiMap<usize, usize>,
39 nondecreasing_expr_indices: Vec<usize>,
41 last_nondec_expr_values: Vec<Option<ScalarImpl>>,
43 is_paused: bool,
45
46 noop_update_hint: bool,
49}
50
51impl ProjectExecutor {
52 pub fn new(
53 ctx: ActorContextRef,
54 input: Executor,
55 exprs: Vec<NonStrictExpression>,
56 watermark_derivations: MultiMap<usize, usize>,
57 nondecreasing_expr_indices: Vec<usize>,
58 noop_update_hint: bool,
59 ) -> Self {
60 let n_nondecreasing_exprs = nondecreasing_expr_indices.len();
61 Self {
62 input,
63 inner: Inner {
64 _ctx: ctx,
65 exprs,
66 watermark_derivations,
67 nondecreasing_expr_indices,
68 last_nondec_expr_values: vec![None; n_nondecreasing_exprs],
69 is_paused: false,
70 noop_update_hint,
71 },
72 }
73 }
74}
75
76impl Debug for ProjectExecutor {
77 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("ProjectExecutor")
79 .field("exprs", &self.inner.exprs)
80 .finish()
81 }
82}
83
84impl Execute for ProjectExecutor {
85 fn execute(self: Box<Self>) -> BoxedMessageStream {
86 self.inner.execute(self.input).boxed()
87 }
88}
89
90pub async fn apply_project_exprs(
91 exprs: &[NonStrictExpression],
92 chunk: StreamChunk,
93) -> StreamExecutorResult<StreamChunk> {
94 let (data_chunk, ops) = chunk.into_parts();
95 let mut projected_columns = Vec::new();
96
97 for expr in exprs {
98 let evaluated_expr = expr.eval_infallible(&data_chunk).await;
99 projected_columns.push(evaluated_expr);
100 }
101 let (_, vis) = data_chunk.into_parts();
102
103 let new_chunk = StreamChunk::with_visibility(ops, projected_columns, vis);
104
105 Ok(new_chunk)
106}
107
108impl Inner {
109 async fn map_filter_chunk(
110 &self,
111 chunk: StreamChunk,
112 ) -> StreamExecutorResult<Option<StreamChunk>> {
113 let mut new_chunk = apply_project_exprs(&self.exprs, chunk).await?;
114 if self.noop_update_hint {
115 new_chunk = new_chunk.eliminate_adjacent_noop_update();
116 }
117 Ok(Some(new_chunk))
118 }
119
120 async fn handle_watermark(&self, watermark: Watermark) -> StreamExecutorResult<Vec<Watermark>> {
121 let out_col_indices = match self.watermark_derivations.get_vec(&watermark.col_idx) {
122 Some(v) => v,
123 None => return Ok(vec![]),
124 };
125 let mut ret = vec![];
126 for out_col_idx in out_col_indices {
127 let out_col_idx = *out_col_idx;
128 let derived_watermark = watermark
129 .clone()
130 .transform_with_expr(&self.exprs[out_col_idx], out_col_idx)
131 .await;
132 if let Some(derived_watermark) = derived_watermark {
133 ret.push(derived_watermark);
134 } else {
135 warn!(
136 "a NULL watermark is derived with the expression {}!",
137 out_col_idx
138 );
139 }
140 }
141 Ok(ret)
142 }
143
144 #[try_stream(ok = Message, error = StreamExecutorError)]
145 async fn execute(mut self, input: Executor) {
146 let mut input = input.execute();
147 let first_barrier = expect_first_barrier(&mut input).await?;
148 self.is_paused = first_barrier.is_pause_on_startup();
149 yield Message::Barrier(first_barrier);
150
151 #[for_await]
152 for msg in input {
153 let msg = msg?;
154 match msg {
155 Message::Watermark(w) => {
156 let watermarks = self.handle_watermark(w).await?;
157 for watermark in watermarks {
158 yield Message::Watermark(watermark)
159 }
160 }
161 Message::Chunk(chunk) => match self.map_filter_chunk(chunk).await? {
162 Some(new_chunk) => {
163 if !self.nondecreasing_expr_indices.is_empty()
164 && let Some((_, first_visible_row)) = new_chunk.rows().next()
165 {
166 first_visible_row
168 .project(&self.nondecreasing_expr_indices)
169 .iter()
170 .enumerate()
171 .for_each(|(idx, value)| {
172 self.last_nondec_expr_values[idx] =
173 Some(value.to_owned_datum().expect(
174 "non-decreasing expression should never be NULL",
175 ));
176 });
177 }
178 yield Message::Chunk(new_chunk)
179 }
180 None => continue,
181 },
182 Message::Barrier(barrier) => {
183 if !self.is_paused {
184 for (&expr_idx, value) in self
185 .nondecreasing_expr_indices
186 .iter()
187 .zip_eq_fast(&mut self.last_nondec_expr_values)
188 {
189 if let Some(value) = std::mem::take(value) {
190 yield Message::Watermark(Watermark::new(
191 expr_idx,
192 self.exprs[expr_idx].return_type(),
193 value,
194 ))
195 }
196 }
197 }
198
199 if let Some(mutation) = barrier.mutation.as_deref() {
200 match mutation {
201 Mutation::Pause => {
202 self.is_paused = true;
203 }
204 Mutation::Resume => {
205 self.is_paused = false;
206 }
207 _ => (),
208 }
209 }
210
211 yield Message::Barrier(barrier);
212 }
213 }
214 }
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use std::sync::atomic::{self, AtomicI64};
221
222 use risingwave_common::array::DataChunk;
223 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
224 use risingwave_common::catalog::Field;
225 use risingwave_common::types::DefaultOrd;
226 use risingwave_common::util::epoch::test_epoch;
227 use risingwave_expr::expr::{self, Expression, ValueImpl};
228
229 use super::*;
230 use crate::executor::test_utils::expr::build_from_pretty;
231 use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
232
233 #[tokio::test]
234 async fn test_projection() {
235 let chunk1 = StreamChunk::from_pretty(
236 " I I
237 + 1 4
238 + 2 5
239 + 3 6",
240 );
241 let chunk2 = StreamChunk::from_pretty(
242 " I I
243 + 7 8
244 - 3 6",
245 );
246 let schema = Schema {
247 fields: vec![
248 Field::unnamed(DataType::Int64),
249 Field::unnamed(DataType::Int64),
250 ],
251 };
252 let pk_indices = vec![0];
253 let (mut tx, source) = MockSource::channel();
254 let source = source.into_executor(schema, pk_indices);
255
256 let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)");
257
258 let proj = ProjectExecutor::new(
259 ActorContext::for_test(123),
260 source,
261 vec![test_expr],
262 MultiMap::new(),
263 vec![],
264 false,
265 );
266 let mut proj = proj.boxed().execute();
267
268 tx.push_barrier(test_epoch(1), false);
269 let barrier = proj.next().await.unwrap().unwrap();
270 barrier.as_barrier().unwrap();
271
272 tx.push_chunk(chunk1);
273 tx.push_chunk(chunk2);
274
275 let msg = proj.next().await.unwrap().unwrap();
276 assert_eq!(
277 *msg.as_chunk().unwrap(),
278 StreamChunk::from_pretty(
279 " I
280 + 5
281 + 7
282 + 9"
283 )
284 );
285
286 let msg = proj.next().await.unwrap().unwrap();
287 assert_eq!(
288 *msg.as_chunk().unwrap(),
289 StreamChunk::from_pretty(
290 " I
291 + 15
292 - 9"
293 )
294 );
295
296 tx.push_barrier(test_epoch(2), true);
297 assert!(proj.next().await.unwrap().unwrap().is_stop());
298 }
299
300 static DUMMY_COUNTER: AtomicI64 = AtomicI64::new(0);
301
302 #[derive(Debug)]
303 struct DummyNondecreasingExpr;
304
305 #[async_trait::async_trait]
306 impl Expression for DummyNondecreasingExpr {
307 fn return_type(&self) -> DataType {
308 DataType::Int64
309 }
310
311 async fn eval_v2(&self, input: &DataChunk) -> expr::Result<ValueImpl> {
312 let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
313 Ok(ValueImpl::Scalar {
314 value: Some(value.into()),
315 capacity: input.capacity(),
316 })
317 }
318
319 async fn eval_row(&self, _input: &OwnedRow) -> expr::Result<Datum> {
320 let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
321 Ok(Some(value.into()))
322 }
323 }
324
325 #[tokio::test]
326 async fn test_watermark_projection() {
327 let schema = Schema {
328 fields: vec![
329 Field::unnamed(DataType::Int64),
330 Field::unnamed(DataType::Int64),
331 ],
332 };
333 let (mut tx, source) = MockSource::channel();
334 let source = source.into_executor(schema, PkIndices::new());
335
336 let a_expr = build_from_pretty("(add:int8 $0:int8 1:int8)");
337 let b_expr = build_from_pretty("(subtract:int8 $0:int8 1:int8)");
338 let c_expr = NonStrictExpression::for_test(DummyNondecreasingExpr);
339
340 let proj = ProjectExecutor::new(
341 ActorContext::for_test(123),
342 source,
343 vec![a_expr, b_expr, c_expr],
344 MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()),
345 vec![2],
346 false,
347 );
348 let mut proj = proj.boxed().execute();
349
350 tx.push_barrier(test_epoch(1), false);
351 tx.push_int64_watermark(0, 100);
352
353 proj.expect_barrier().await;
354 let w1 = proj.expect_watermark().await;
355 let w2 = proj.expect_watermark().await;
356 let (w1, w2) = if w1.col_idx < w2.col_idx {
357 (w1, w2)
358 } else {
359 (w2, w1)
360 };
361
362 assert_eq!(
363 w1,
364 Watermark {
365 col_idx: 0,
366 data_type: DataType::Int64,
367 val: ScalarImpl::Int64(101)
368 }
369 );
370 assert_eq!(
371 w2,
372 Watermark {
373 col_idx: 1,
374 data_type: DataType::Int64,
375 val: ScalarImpl::Int64(99)
376 }
377 );
378
379 tx.push_chunk(StreamChunk::from_pretty(
381 " I I
382 + 120 4
383 + 146 5
384 + 133 6",
385 ));
386 proj.expect_chunk().await;
387 tx.push_chunk(StreamChunk::from_pretty(
388 " I I
389 + 213 8
390 - 133 6",
391 ));
392 proj.expect_chunk().await;
393
394 tx.push_barrier(test_epoch(2), false);
395 let w3 = proj.expect_watermark().await;
396 proj.expect_barrier().await;
397
398 tx.push_chunk(StreamChunk::from_pretty(
399 " I I
400 + 100 3
401 + 104 5
402 + 187 3",
403 ));
404 proj.expect_chunk().await;
405
406 tx.push_barrier(test_epoch(3), false);
407 let w4 = proj.expect_watermark().await;
408 proj.expect_barrier().await;
409
410 assert_eq!(w3.col_idx, w4.col_idx);
411 assert!(w3.val.default_cmp(&w4.val).is_le());
412
413 tx.push_int64_watermark(1, 100);
414 tx.push_barrier(test_epoch(4), true);
415
416 assert!(proj.next().await.unwrap().unwrap().is_stop());
417 }
418}