risingwave_stream/executor/project/
project_scalar.rs1use multimap::MultiMap;
16use risingwave_common::row::RowExt;
17use risingwave_common::types::ToOwnedDatum;
18use risingwave_common::util::iter_util::ZipEqFast;
19use risingwave_expr::expr::NonStrictExpression;
20
21use crate::executor::prelude::*;
22
23pub struct ProjectExecutor {
27 input: Executor,
28 inner: Inner,
29}
30
31struct Inner {
32 _ctx: ActorContextRef,
33
34 exprs: Vec<NonStrictExpression>,
36 watermark_derivations: MultiMap<usize, usize>,
39 nondecreasing_expr_indices: Vec<usize>,
41 last_nondec_expr_values: Vec<Option<ScalarImpl>>,
43 is_paused: bool,
45
46 noop_update_hint: bool,
49}
50
51impl ProjectExecutor {
52 pub fn new(
53 ctx: ActorContextRef,
54 input: Executor,
55 exprs: Vec<NonStrictExpression>,
56 watermark_derivations: MultiMap<usize, usize>,
57 nondecreasing_expr_indices: Vec<usize>,
58 noop_update_hint: bool,
59 ) -> Self {
60 let n_nondecreasing_exprs = nondecreasing_expr_indices.len();
61 Self {
62 input,
63 inner: Inner {
64 _ctx: ctx,
65 exprs,
66 watermark_derivations,
67 nondecreasing_expr_indices,
68 last_nondec_expr_values: vec![None; n_nondecreasing_exprs],
69 is_paused: false,
70 noop_update_hint,
71 },
72 }
73 }
74}
75
76impl Debug for ProjectExecutor {
77 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("ProjectExecutor")
79 .field("exprs", &self.inner.exprs)
80 .finish()
81 }
82}
83
84impl Execute for ProjectExecutor {
85 fn execute(self: Box<Self>) -> BoxedMessageStream {
86 self.inner.execute(self.input).boxed()
87 }
88}
89
90impl Inner {
91 async fn map_filter_chunk(
92 &self,
93 chunk: StreamChunk,
94 ) -> StreamExecutorResult<Option<StreamChunk>> {
95 let (data_chunk, ops) = chunk.into_parts();
96 let mut projected_columns = Vec::new();
97
98 for expr in &self.exprs {
99 let evaluated_expr = expr.eval_infallible(&data_chunk).await;
100 projected_columns.push(evaluated_expr);
101 }
102 let (_, vis) = data_chunk.into_parts();
103
104 let mut new_chunk = StreamChunk::with_visibility(ops, projected_columns, vis);
105 if self.noop_update_hint {
106 new_chunk = new_chunk.eliminate_adjacent_noop_update();
107 }
108 Ok(Some(new_chunk))
109 }
110
111 async fn handle_watermark(&self, watermark: Watermark) -> StreamExecutorResult<Vec<Watermark>> {
112 let out_col_indices = match self.watermark_derivations.get_vec(&watermark.col_idx) {
113 Some(v) => v,
114 None => return Ok(vec![]),
115 };
116 let mut ret = vec![];
117 for out_col_idx in out_col_indices {
118 let out_col_idx = *out_col_idx;
119 let derived_watermark = watermark
120 .clone()
121 .transform_with_expr(&self.exprs[out_col_idx], out_col_idx)
122 .await;
123 if let Some(derived_watermark) = derived_watermark {
124 ret.push(derived_watermark);
125 } else {
126 warn!(
127 "a NULL watermark is derived with the expression {}!",
128 out_col_idx
129 );
130 }
131 }
132 Ok(ret)
133 }
134
135 #[try_stream(ok = Message, error = StreamExecutorError)]
136 async fn execute(mut self, input: Executor) {
137 let mut input = input.execute();
138 let first_barrier = expect_first_barrier(&mut input).await?;
139 self.is_paused = first_barrier.is_pause_on_startup();
140 yield Message::Barrier(first_barrier);
141
142 #[for_await]
143 for msg in input {
144 let msg = msg?;
145 match msg {
146 Message::Watermark(w) => {
147 let watermarks = self.handle_watermark(w).await?;
148 for watermark in watermarks {
149 yield Message::Watermark(watermark)
150 }
151 }
152 Message::Chunk(chunk) => match self.map_filter_chunk(chunk).await? {
153 Some(new_chunk) => {
154 if !self.nondecreasing_expr_indices.is_empty() {
155 if let Some((_, first_visible_row)) = new_chunk.rows().next() {
156 first_visible_row
158 .project(&self.nondecreasing_expr_indices)
159 .iter()
160 .enumerate()
161 .for_each(|(idx, value)| {
162 self.last_nondec_expr_values[idx] =
163 Some(value.to_owned_datum().expect(
164 "non-decreasing expression should never be NULL",
165 ));
166 });
167 }
168 }
169 yield Message::Chunk(new_chunk)
170 }
171 None => continue,
172 },
173 Message::Barrier(barrier) => {
174 if !self.is_paused {
175 for (&expr_idx, value) in self
176 .nondecreasing_expr_indices
177 .iter()
178 .zip_eq_fast(&mut self.last_nondec_expr_values)
179 {
180 if let Some(value) = std::mem::take(value) {
181 yield Message::Watermark(Watermark::new(
182 expr_idx,
183 self.exprs[expr_idx].return_type(),
184 value,
185 ))
186 }
187 }
188 }
189
190 if let Some(mutation) = barrier.mutation.as_deref() {
191 match mutation {
192 Mutation::Pause => {
193 self.is_paused = true;
194 }
195 Mutation::Resume => {
196 self.is_paused = false;
197 }
198 _ => (),
199 }
200 }
201
202 yield Message::Barrier(barrier);
203 }
204 }
205 }
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use std::sync::atomic::{self, AtomicI64};
212
213 use risingwave_common::array::DataChunk;
214 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
215 use risingwave_common::catalog::Field;
216 use risingwave_common::types::DefaultOrd;
217 use risingwave_common::util::epoch::test_epoch;
218 use risingwave_expr::expr::{self, Expression, ValueImpl};
219
220 use super::*;
221 use crate::executor::test_utils::expr::build_from_pretty;
222 use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
223
224 #[tokio::test]
225 async fn test_projection() {
226 let chunk1 = StreamChunk::from_pretty(
227 " I I
228 + 1 4
229 + 2 5
230 + 3 6",
231 );
232 let chunk2 = StreamChunk::from_pretty(
233 " I I
234 + 7 8
235 - 3 6",
236 );
237 let schema = Schema {
238 fields: vec![
239 Field::unnamed(DataType::Int64),
240 Field::unnamed(DataType::Int64),
241 ],
242 };
243 let pk_indices = vec![0];
244 let (mut tx, source) = MockSource::channel();
245 let source = source.into_executor(schema, pk_indices);
246
247 let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)");
248
249 let proj = ProjectExecutor::new(
250 ActorContext::for_test(123),
251 source,
252 vec![test_expr],
253 MultiMap::new(),
254 vec![],
255 false,
256 );
257 let mut proj = proj.boxed().execute();
258
259 tx.push_barrier(test_epoch(1), false);
260 let barrier = proj.next().await.unwrap().unwrap();
261 barrier.as_barrier().unwrap();
262
263 tx.push_chunk(chunk1);
264 tx.push_chunk(chunk2);
265
266 let msg = proj.next().await.unwrap().unwrap();
267 assert_eq!(
268 *msg.as_chunk().unwrap(),
269 StreamChunk::from_pretty(
270 " I
271 + 5
272 + 7
273 + 9"
274 )
275 );
276
277 let msg = proj.next().await.unwrap().unwrap();
278 assert_eq!(
279 *msg.as_chunk().unwrap(),
280 StreamChunk::from_pretty(
281 " I
282 + 15
283 - 9"
284 )
285 );
286
287 tx.push_barrier(test_epoch(2), true);
288 assert!(proj.next().await.unwrap().unwrap().is_stop());
289 }
290
291 static DUMMY_COUNTER: AtomicI64 = AtomicI64::new(0);
292
293 #[derive(Debug)]
294 struct DummyNondecreasingExpr;
295
296 #[async_trait::async_trait]
297 impl Expression for DummyNondecreasingExpr {
298 fn return_type(&self) -> DataType {
299 DataType::Int64
300 }
301
302 async fn eval_v2(&self, input: &DataChunk) -> expr::Result<ValueImpl> {
303 let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
304 Ok(ValueImpl::Scalar {
305 value: Some(value.into()),
306 capacity: input.capacity(),
307 })
308 }
309
310 async fn eval_row(&self, _input: &OwnedRow) -> expr::Result<Datum> {
311 let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
312 Ok(Some(value.into()))
313 }
314 }
315
316 #[tokio::test]
317 async fn test_watermark_projection() {
318 let schema = Schema {
319 fields: vec![
320 Field::unnamed(DataType::Int64),
321 Field::unnamed(DataType::Int64),
322 ],
323 };
324 let (mut tx, source) = MockSource::channel();
325 let source = source.into_executor(schema, PkIndices::new());
326
327 let a_expr = build_from_pretty("(add:int8 $0:int8 1:int8)");
328 let b_expr = build_from_pretty("(subtract:int8 $0:int8 1:int8)");
329 let c_expr = NonStrictExpression::for_test(DummyNondecreasingExpr);
330
331 let proj = ProjectExecutor::new(
332 ActorContext::for_test(123),
333 source,
334 vec![a_expr, b_expr, c_expr],
335 MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()),
336 vec![2],
337 false,
338 );
339 let mut proj = proj.boxed().execute();
340
341 tx.push_barrier(test_epoch(1), false);
342 tx.push_int64_watermark(0, 100);
343
344 proj.expect_barrier().await;
345 let w1 = proj.expect_watermark().await;
346 let w2 = proj.expect_watermark().await;
347 let (w1, w2) = if w1.col_idx < w2.col_idx {
348 (w1, w2)
349 } else {
350 (w2, w1)
351 };
352
353 assert_eq!(
354 w1,
355 Watermark {
356 col_idx: 0,
357 data_type: DataType::Int64,
358 val: ScalarImpl::Int64(101)
359 }
360 );
361 assert_eq!(
362 w2,
363 Watermark {
364 col_idx: 1,
365 data_type: DataType::Int64,
366 val: ScalarImpl::Int64(99)
367 }
368 );
369
370 tx.push_chunk(StreamChunk::from_pretty(
372 " I I
373 + 120 4
374 + 146 5
375 + 133 6",
376 ));
377 proj.expect_chunk().await;
378 tx.push_chunk(StreamChunk::from_pretty(
379 " I I
380 + 213 8
381 - 133 6",
382 ));
383 proj.expect_chunk().await;
384
385 tx.push_barrier(test_epoch(2), false);
386 let w3 = proj.expect_watermark().await;
387 proj.expect_barrier().await;
388
389 tx.push_chunk(StreamChunk::from_pretty(
390 " I I
391 + 100 3
392 + 104 5
393 + 187 3",
394 ));
395 proj.expect_chunk().await;
396
397 tx.push_barrier(test_epoch(3), false);
398 let w4 = proj.expect_watermark().await;
399 proj.expect_barrier().await;
400
401 assert_eq!(w3.col_idx, w4.col_idx);
402 assert!(w3.val.default_cmp(&w4.val).is_le());
403
404 tx.push_int64_watermark(1, 100);
405 tx.push_barrier(test_epoch(4), true);
406
407 assert!(proj.next().await.unwrap().unwrap().is_stop());
408 }
409}