1use std::future::Future;
16
17use futures::future::{Either, pending, select};
18use futures::pin_mut;
19use futures::stream::FuturesOrdered;
20use multimap::MultiMap;
21use risingwave_common::metrics::LabelGuardedIntGauge;
22use risingwave_common::row::RowExt;
23use risingwave_common::types::ToOwnedDatum;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_expr::expr::NonStrictExpression;
26use tokio::sync::Semaphore;
27
28use crate::executor::prelude::*;
29
30type ProjectMessageFuture = impl Future<Output = StreamExecutorResult<ProjectedMessage>> + Send;
31type PendingProjectMessages = FuturesOrdered<ProjectMessageFuture>;
32
33enum ProjectMessageInput {
34 Chunk {
35 chunk: StreamChunk,
36 inflight_request_semaphore: Option<Arc<Semaphore>>,
37 },
38 Watermark {
39 watermark: Watermark,
40 out_col_indices: Vec<usize>,
41 },
42 Barrier(Barrier),
43}
44
45enum ProjectedMessage {
46 Chunk(StreamChunk),
47 Watermark(Vec<Watermark>),
48 Barrier(Barrier),
49}
50
51pub struct ProjectExecutor {
55 input: Executor,
56 inner: Inner,
57}
58
59struct Inner {
60 actor_id: ActorId,
61
62 exprs: Arc<Vec<NonStrictExpression>>,
64 watermark_derivations: MultiMap<usize, usize>,
67 nondecreasing_expr_indices: Vec<usize>,
69 last_nondec_expr_values: Vec<Option<ScalarImpl>>,
71
72 eliminate_noop_updates: bool,
75
76 project_expr_concurrency: usize,
78
79 project_expr_inflight_request_concurrency: Option<Arc<Semaphore>>,
82
83 project_expr_inflight_window_size: Option<LabelGuardedIntGauge>,
85}
86
87impl ProjectExecutor {
88 pub fn new(
89 ctx: ActorContextRef,
90 input: Executor,
91 exprs: Vec<NonStrictExpression>,
92 watermark_derivations: MultiMap<usize, usize>,
93 nondecreasing_expr_indices: Vec<usize>,
94 noop_update_hint: bool,
95 ) -> Self {
96 let n_nondecreasing_exprs = nondecreasing_expr_indices.len();
97 let eliminate_noop_updates =
98 noop_update_hint || ctx.config.developer.aggressive_noop_update_elimination;
99 let project_expr_concurrency = match ctx.config.developer.project_expr_concurrency {
100 0 => usize::MAX,
101 concurrency => concurrency,
102 };
103 let project_expr_inflight_request_concurrency = match ctx
104 .config
105 .developer
106 .project_expr_inflight_request_concurrency
107 {
108 0 => None,
109 concurrency => Some(Arc::new(Semaphore::new(concurrency))),
110 };
111 let project_expr_inflight_window_size = (ctx.config.developer.project_expr_concurrency
112 != risingwave_common::config::default::developer::stream_project_expr_concurrency())
113 .then(|| {
114 ctx.streaming_metrics
115 .project_expr_inflight_window_size
116 .with_guarded_label_values(&[&ctx.id.to_string(), &ctx.fragment_id.to_string()])
117 });
118 Self {
119 input,
120 inner: Inner {
121 actor_id: ctx.id,
122 exprs: Arc::new(exprs),
123 watermark_derivations,
124 nondecreasing_expr_indices,
125 last_nondec_expr_values: vec![None; n_nondecreasing_exprs],
126 eliminate_noop_updates,
127 project_expr_concurrency,
128 project_expr_inflight_request_concurrency,
129 project_expr_inflight_window_size,
130 },
131 }
132 }
133}
134
135impl Debug for ProjectExecutor {
136 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137 f.debug_struct("ProjectExecutor")
138 .field("exprs", &self.inner.exprs)
139 .finish()
140 }
141}
142
143impl Execute for ProjectExecutor {
144 fn execute(self: Box<Self>) -> BoxedMessageStream {
145 self.inner.execute(self.input).boxed()
146 }
147}
148
149pub async fn apply_project_exprs(
150 exprs: &[NonStrictExpression],
151 chunk: StreamChunk,
152) -> StreamExecutorResult<StreamChunk> {
153 let (data_chunk, ops) = chunk.into_parts();
154 let mut projected_columns = Vec::new();
155
156 for expr in exprs {
157 let evaluated_expr = expr.eval_infallible(&data_chunk).await;
158 projected_columns.push(evaluated_expr);
159 }
160 let (_, vis) = data_chunk.into_parts();
161
162 let new_chunk = StreamChunk::with_visibility(ops, projected_columns, vis);
163
164 Ok(new_chunk)
165}
166
167impl Inner {
168 #[define_opaque(ProjectMessageFuture)]
169 fn project_message(
170 exprs: Arc<Vec<NonStrictExpression>>,
171 eliminate_noop_updates: bool,
172 input: ProjectMessageInput,
173 ) -> ProjectMessageFuture {
174 async move {
175 match input {
176 ProjectMessageInput::Chunk {
177 chunk,
178 inflight_request_semaphore,
179 } => {
180 let _permit = if let Some(semaphore) = inflight_request_semaphore {
181 Some(semaphore.acquire_owned().await.expect(
182 "project expression in-flight request semaphore should not be closed",
183 ))
184 } else {
185 None
186 };
187 let mut new_chunk = apply_project_exprs(&exprs, chunk)
188 .instrument_await("project_eval_chunk")
189 .await?;
190 if eliminate_noop_updates {
191 new_chunk = new_chunk.eliminate_adjacent_noop_update();
192 }
193 Ok(ProjectedMessage::Chunk(new_chunk))
194 }
195 ProjectMessageInput::Watermark {
196 watermark,
197 out_col_indices,
198 } => {
199 let mut ret = vec![];
200 for out_col_idx in out_col_indices {
201 let derived_watermark = watermark
202 .clone()
203 .transform_with_expr(&exprs[out_col_idx], out_col_idx)
204 .await;
205 if let Some(derived_watermark) = derived_watermark {
206 ret.push(derived_watermark);
207 } else {
208 warn!(
209 "a NULL watermark is derived with the expression {}!",
210 out_col_idx
211 );
212 }
213 }
214 Ok(ProjectedMessage::Watermark(ret))
215 }
216 ProjectMessageInput::Barrier(barrier) => Ok(ProjectedMessage::Barrier(barrier)),
217 }
218 }
219 }
220
221 fn update_last_nondec_expr_values(
222 nondecreasing_expr_indices: &[usize],
223 last_nondec_expr_values: &mut [Option<ScalarImpl>],
224 new_chunk: &StreamChunk,
225 ) {
226 {
227 {
228 {
229 {
230 if !nondecreasing_expr_indices.is_empty()
231 && let Some((_, first_visible_row)) = new_chunk.rows().next()
232 {
233 first_visible_row
235 .project(nondecreasing_expr_indices)
236 .iter()
237 .enumerate()
238 .for_each(|(idx, value)| {
239 last_nondec_expr_values[idx] =
240 Some(value.to_owned_datum().expect(
241 "non-decreasing expression should never be NULL",
242 ));
243 });
244 }
245 }
246 }
247 }
248 }
249 }
250
251 #[try_stream(ok = Message, error = StreamExecutorError)]
252 async fn execute(self, input: Executor) {
253 let Inner {
254 actor_id,
255 exprs,
256 watermark_derivations,
257 nondecreasing_expr_indices,
258 mut last_nondec_expr_values,
259 eliminate_noop_updates,
260 project_expr_concurrency,
261 project_expr_inflight_request_concurrency,
262 project_expr_inflight_window_size,
263 } = self;
264
265 let mut input = input.execute();
266 let first_barrier = expect_first_barrier(&mut input).await?;
267 let mut is_paused = first_barrier.is_pause_on_startup();
268 let mut received_stop_barrier = first_barrier.is_stop(actor_id);
269 yield Message::Barrier(first_barrier);
270 if received_stop_barrier {
271 return Ok(());
272 }
273
274 let mut pending_project_messages = PendingProjectMessages::new();
275 let mut pending_project_chunks = 0;
276
277 loop {
278 let has_pending_project_message = !pending_project_messages.is_empty();
279 let can_read_input =
280 !received_stop_barrier && pending_project_chunks < project_expr_concurrency;
281 let next_projected_message = async {
282 if has_pending_project_message {
283 pending_project_messages
284 .next()
285 .await
286 .expect("pending project messages should not be empty")
287 } else {
288 pending().await
289 }
290 };
291 let next_input_msg = async {
292 if can_read_input {
293 input.next().await.ok_or_else(|| {
294 StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
295 })?
296 } else {
297 pending().await
298 }
299 };
300
301 pin_mut!(next_projected_message);
302 pin_mut!(next_input_msg);
303
304 match select(next_projected_message, next_input_msg).await {
305 Either::Left((projected_message, _)) => match projected_message? {
306 ProjectedMessage::Chunk(new_chunk) => {
307 pending_project_chunks -= 1;
308 if let Some(metric) = &project_expr_inflight_window_size {
309 metric.set(pending_project_messages.len() as _);
310 }
311 Self::update_last_nondec_expr_values(
312 &nondecreasing_expr_indices,
313 &mut last_nondec_expr_values,
314 &new_chunk,
315 );
316 yield Message::Chunk(new_chunk);
317 }
318 ProjectedMessage::Watermark(watermarks) => {
319 if let Some(metric) = &project_expr_inflight_window_size {
320 metric.set(pending_project_messages.len() as _);
321 }
322 for watermark in watermarks {
323 yield Message::Watermark(watermark);
324 }
325 }
326 ProjectedMessage::Barrier(barrier) => {
327 if let Some(metric) = &project_expr_inflight_window_size {
328 metric.set(pending_project_messages.len() as _);
329 }
330 if !is_paused {
331 for (&expr_idx, value) in nondecreasing_expr_indices
332 .iter()
333 .zip_eq_fast(&mut last_nondec_expr_values)
334 {
335 if let Some(value) = std::mem::take(value) {
336 yield Message::Watermark(Watermark::new(
337 expr_idx,
338 exprs[expr_idx].return_type(),
339 value,
340 ))
341 }
342 }
343 }
344
345 if let Some(mutation) = barrier.mutation.as_deref() {
346 match mutation {
347 Mutation::Pause => {
348 is_paused = true;
349 }
350 Mutation::Resume => {
351 is_paused = false;
352 }
353 _ => (),
354 }
355 }
356
357 let should_stop = barrier.is_stop(actor_id);
358 yield Message::Barrier(barrier);
359 if should_stop {
360 break;
361 }
362 }
363 },
364 Either::Right((msg, _)) => match msg? {
365 Message::Watermark(w) => {
366 let out_col_indices = match watermark_derivations.get_vec(&w.col_idx) {
367 Some(v) => v,
368 None => continue,
369 };
370 pending_project_messages.push_back(Self::project_message(
371 exprs.clone(),
372 eliminate_noop_updates,
373 ProjectMessageInput::Watermark {
374 watermark: w,
375 out_col_indices: out_col_indices.clone(),
376 },
377 ));
378 if let Some(metric) = &project_expr_inflight_window_size {
379 metric.set(pending_project_messages.len() as _);
380 }
381 }
382 Message::Chunk(chunk) => {
383 pending_project_messages.push_back(Self::project_message(
384 exprs.clone(),
385 eliminate_noop_updates,
386 ProjectMessageInput::Chunk {
387 chunk,
388 inflight_request_semaphore:
389 project_expr_inflight_request_concurrency.clone(),
390 },
391 ));
392 pending_project_chunks += 1;
393 if let Some(metric) = &project_expr_inflight_window_size {
394 metric.set(pending_project_messages.len() as _);
395 }
396 }
397 Message::Barrier(barrier) => {
398 if barrier.is_stop(actor_id) {
399 received_stop_barrier = true;
400 }
401 pending_project_messages.push_back(Self::project_message(
402 exprs.clone(),
403 eliminate_noop_updates,
404 ProjectMessageInput::Barrier(barrier),
405 ));
406 if let Some(metric) = &project_expr_inflight_window_size {
407 metric.set(pending_project_messages.len() as _);
408 }
409 }
410 },
411 }
412 }
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicUsize};
419 use std::time::Duration;
420
421 use risingwave_common::array::DataChunk;
422 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
423 use risingwave_common::catalog::Field;
424 use risingwave_common::config::StreamingConfig;
425 use risingwave_common::types::DefaultOrd;
426 use risingwave_common::util::epoch::test_epoch;
427 use risingwave_expr::expr::{self, Expression, ValueImpl};
428 use tokio::sync::Notify;
429 use tokio::time::timeout;
430
431 use super::*;
432 use crate::executor::StopMutation;
433 use crate::executor::test_utils::expr::build_from_pretty;
434 use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
435
436 fn actor_context_with_project_expr_concurrency(concurrency: usize) -> ActorContextRef {
437 actor_context_with_project_expr_limits(concurrency, 0)
438 }
439
440 fn actor_context_with_project_expr_limits(
441 concurrency: usize,
442 inflight_request_concurrency: usize,
443 ) -> ActorContextRef {
444 let mut config = StreamingConfig::default();
445 config.developer.project_expr_concurrency = concurrency;
446 config.developer.project_expr_inflight_request_concurrency = inflight_request_concurrency;
447 let mut ctx = ActorContext::for_test(123);
448 Arc::get_mut(&mut ctx)
449 .expect("test actor context should not be shared")
450 .config = Arc::new(config);
451 ctx
452 }
453
454 #[tokio::test]
455 async fn test_projection() {
456 let chunk1 = StreamChunk::from_pretty(
457 " I I
458 + 1 4
459 + 2 5
460 + 3 6",
461 );
462 let chunk2 = StreamChunk::from_pretty(
463 " I I
464 + 7 8
465 - 3 6",
466 );
467 let schema = Schema {
468 fields: vec![
469 Field::unnamed(DataType::Int64),
470 Field::unnamed(DataType::Int64),
471 ],
472 };
473 let stream_key = vec![0];
474 let (mut tx, source) = MockSource::channel();
475 let source = source.into_executor(schema, stream_key);
476
477 let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)");
478
479 let proj = ProjectExecutor::new(
480 ActorContext::for_test(123),
481 source,
482 vec![test_expr],
483 MultiMap::new(),
484 vec![],
485 false,
486 );
487 let mut proj = proj.boxed().execute();
488
489 tx.push_barrier(test_epoch(1), false);
490 let barrier = proj.next().await.unwrap().unwrap();
491 barrier.as_barrier().unwrap();
492
493 tx.push_chunk(chunk1);
494 tx.push_chunk(chunk2);
495
496 let msg = proj.next().await.unwrap().unwrap();
497 assert_eq!(
498 *msg.as_chunk().unwrap(),
499 StreamChunk::from_pretty(
500 " I
501 + 5
502 + 7
503 + 9"
504 )
505 );
506
507 let msg = proj.next().await.unwrap().unwrap();
508 assert_eq!(
509 *msg.as_chunk().unwrap(),
510 StreamChunk::from_pretty(
511 " I
512 + 15
513 - 9"
514 )
515 );
516
517 tx.push_barrier(test_epoch(2), true);
518 assert!(proj.next().await.unwrap().unwrap().is_stop());
519 }
520
521 #[tokio::test]
522 async fn test_projection_does_not_poll_after_stop_barrier() {
523 let schema = Schema {
524 fields: vec![Field::unnamed(DataType::Int64)],
525 };
526 let (mut tx, source) = MockSource::channel();
527 let source = source.into_executor(schema, StreamKey::new());
528
529 let test_expr = build_from_pretty("(add:int8 $0:int8 1:int8)");
530
531 let proj = ProjectExecutor::new(
532 ActorContext::for_test(123),
533 source,
534 vec![test_expr],
535 MultiMap::new(),
536 vec![],
537 false,
538 );
539 let mut proj = proj.boxed().execute();
540
541 tx.push_barrier(test_epoch(1), false);
542 proj.expect_barrier().await;
543
544 tx.send_barrier(
545 Barrier::new_test_barrier(test_epoch(2)).with_mutation(Mutation::Stop(StopMutation {
546 dropped_actors: std::iter::once(123.into()).collect(),
547 ..Default::default()
548 })),
549 );
550 tx.push_chunk(StreamChunk::from_pretty(
551 " I
552 + 1",
553 ));
554
555 assert!(proj.next().await.unwrap().unwrap().is_stop());
556 assert!(proj.next().await.is_none());
557 }
558
559 #[derive(Debug)]
560 struct BlockingProjectExpr {
561 started_count: Arc<AtomicUsize>,
562 second_started: Arc<AtomicBool>,
563 second_started_notify: Arc<Notify>,
564 release_first: Arc<AtomicBool>,
565 release_first_notify: Arc<Notify>,
566 }
567
568 #[async_trait::async_trait]
569 impl Expression for BlockingProjectExpr {
570 fn return_type(&self) -> DataType {
571 DataType::Int64
572 }
573
574 async fn eval_v2(&self, input: &DataChunk) -> expr::Result<ValueImpl> {
575 let call_idx = self.started_count.fetch_add(1, atomic::Ordering::SeqCst);
576 if call_idx == 0 {
577 loop {
578 let notified = self.second_started_notify.notified();
579 if self.second_started.load(atomic::Ordering::SeqCst) {
580 break;
581 }
582 notified.await;
583 }
584 loop {
585 let notified = self.release_first_notify.notified();
586 if self.release_first.load(atomic::Ordering::SeqCst) {
587 break;
588 }
589 notified.await;
590 }
591 } else if call_idx == 1 {
592 self.second_started.store(true, atomic::Ordering::SeqCst);
593 self.second_started_notify.notify_waiters();
594 }
595
596 Ok(ValueImpl::Scalar {
597 value: Some((call_idx as i64).into()),
598 capacity: input.capacity(),
599 })
600 }
601
602 async fn eval_row(&self, _input: &OwnedRow) -> expr::Result<Datum> {
603 unimplemented!()
604 }
605 }
606
607 #[derive(Debug)]
608 struct FirstProjectExprWaitsForStartedCount {
609 started_count: Arc<AtomicUsize>,
610 started_notify: Arc<Notify>,
611 unblock_first_at_started_count: usize,
612 }
613
614 #[async_trait::async_trait]
615 impl Expression for FirstProjectExprWaitsForStartedCount {
616 fn return_type(&self) -> DataType {
617 DataType::Int64
618 }
619
620 async fn eval_v2(&self, input: &DataChunk) -> expr::Result<ValueImpl> {
621 let call_idx = self.started_count.fetch_add(1, atomic::Ordering::SeqCst);
622 self.started_notify.notify_waiters();
623 if call_idx == 0 {
624 loop {
625 let notified = self.started_notify.notified();
626 if self.started_count.load(atomic::Ordering::SeqCst)
627 >= self.unblock_first_at_started_count
628 {
629 break;
630 }
631 notified.await;
632 }
633 }
634
635 Ok(ValueImpl::Scalar {
636 value: Some((call_idx as i64).into()),
637 capacity: input.capacity(),
638 })
639 }
640
641 async fn eval_row(&self, _input: &OwnedRow) -> expr::Result<Datum> {
642 Ok(Some(0_i64.into()))
643 }
644 }
645
646 #[tokio::test]
647 async fn test_projection_evaluates_chunks_concurrently_before_barrier() {
648 let schema = Schema {
649 fields: vec![Field::unnamed(DataType::Int64)],
650 };
651 let (mut tx, source) = MockSource::channel();
652 let source = source.into_executor(schema, StreamKey::new());
653
654 let started_count = Arc::new(AtomicUsize::new(0));
655 let second_started = Arc::new(AtomicBool::new(false));
656 let second_started_notify = Arc::new(Notify::new());
657 let release_first = Arc::new(AtomicBool::new(false));
658 let release_first_notify = Arc::new(Notify::new());
659
660 let test_expr = NonStrictExpression::for_test(BlockingProjectExpr {
661 started_count,
662 second_started: second_started.clone(),
663 second_started_notify: second_started_notify.clone(),
664 release_first: release_first.clone(),
665 release_first_notify: release_first_notify.clone(),
666 });
667
668 let proj = ProjectExecutor::new(
669 actor_context_with_project_expr_concurrency(2),
670 source,
671 vec![test_expr],
672 MultiMap::new(),
673 vec![],
674 false,
675 );
676 let mut proj = proj.boxed().execute();
677
678 tx.push_barrier(test_epoch(1), false);
679 proj.expect_barrier().await;
680
681 tx.push_chunk(StreamChunk::from_pretty(
682 " I
683 + 1",
684 ));
685 tx.push_chunk(StreamChunk::from_pretty(
686 " I
687 + 2",
688 ));
689 tx.push_barrier(test_epoch(2), true);
690
691 let next_msg = proj.next();
692 pin_mut!(next_msg);
693 timeout(Duration::from_secs(5), async {
694 loop {
695 let notified = second_started_notify.notified();
696 if second_started.load(atomic::Ordering::SeqCst) {
697 break;
698 }
699 tokio::select! {
700 _ = notified => {}
701 msg = &mut next_msg => {
702 panic!("project executor emitted before the second chunk started: {msg:?}");
703 }
704 }
705 }
706 })
707 .await
708 .expect("second chunk expression did not start");
709
710 release_first.store(true, atomic::Ordering::SeqCst);
711 release_first_notify.notify_waiters();
712
713 let msg = next_msg.await.unwrap().unwrap();
714 assert_eq!(
715 *msg.as_chunk().unwrap(),
716 StreamChunk::from_pretty(
717 " I
718 + 0"
719 )
720 );
721
722 let msg = proj.next().await.unwrap().unwrap();
723 assert_eq!(
724 *msg.as_chunk().unwrap(),
725 StreamChunk::from_pretty(
726 " I
727 + 1"
728 )
729 );
730
731 assert!(proj.next().await.unwrap().unwrap().is_stop());
732 }
733
734 #[tokio::test]
735 async fn test_projection_reuses_finished_inflight_permit_across_barrier() {
736 let schema = Schema {
737 fields: vec![Field::unnamed(DataType::Int64)],
738 };
739 let (mut tx, source) = MockSource::channel();
740 let source = source.into_executor(schema, StreamKey::new());
741
742 let started_count = Arc::new(AtomicUsize::new(0));
743 let started_notify = Arc::new(Notify::new());
744 let test_expr = NonStrictExpression::for_test(FirstProjectExprWaitsForStartedCount {
745 started_count: started_count.clone(),
746 started_notify: started_notify.clone(),
747 unblock_first_at_started_count: 3,
748 });
749
750 let proj = ProjectExecutor::new(
751 actor_context_with_project_expr_limits(3, 2),
752 source,
753 vec![test_expr],
754 MultiMap::new(),
755 vec![],
756 false,
757 );
758 let mut proj = proj.boxed().execute();
759
760 tx.push_barrier(test_epoch(1), false);
761 proj.expect_barrier().await;
762
763 tx.push_chunk(StreamChunk::from_pretty(
764 " I
765 + 1",
766 ));
767 tx.push_chunk(StreamChunk::from_pretty(
768 " I
769 + 2",
770 ));
771 tx.push_barrier(test_epoch(2), false);
772 tx.push_chunk(StreamChunk::from_pretty(
773 " I
774 + 3",
775 ));
776 tx.push_barrier(test_epoch(3), true);
777
778 let first_msg = timeout(Duration::from_secs(5), async {
779 let next_msg = proj.next();
780 pin_mut!(next_msg);
781 let mut first_msg = None;
782 loop {
783 let notified = started_notify.notified();
784 if started_count.load(atomic::Ordering::SeqCst) >= 3 {
785 break;
786 }
787 tokio::select! {
788 _ = notified => {}
789 msg = &mut next_msg => {
790 if started_count.load(atomic::Ordering::SeqCst) < 3 {
791 panic!("project executor emitted before the post-barrier chunk started: {msg:?}");
792 }
793 first_msg = Some(msg.unwrap().unwrap());
794 break;
795 }
796 }
797 }
798 match first_msg {
799 Some(msg) => msg,
800 None => next_msg.await.unwrap().unwrap(),
801 }
802 })
803 .await
804 .expect("post-barrier chunk expression did not start");
805
806 assert_eq!(
807 *first_msg.as_chunk().unwrap(),
808 StreamChunk::from_pretty(
809 " I
810 + 0"
811 )
812 );
813 assert_eq!(
814 *proj.next().await.unwrap().unwrap().as_chunk().unwrap(),
815 StreamChunk::from_pretty(
816 " I
817 + 1"
818 )
819 );
820 proj.expect_barrier().await;
821 assert_eq!(
822 *proj.next().await.unwrap().unwrap().as_chunk().unwrap(),
823 StreamChunk::from_pretty(
824 " I
825 + 2"
826 )
827 );
828 assert!(proj.next().await.unwrap().unwrap().is_stop());
829 }
830
831 #[tokio::test]
832 async fn test_projection_reuses_finished_inflight_permit_across_watermark() {
833 let schema = Schema {
834 fields: vec![Field::unnamed(DataType::Int64)],
835 };
836 let (mut tx, source) = MockSource::channel();
837 let source = source.into_executor(schema, StreamKey::new());
838
839 let started_count = Arc::new(AtomicUsize::new(0));
840 let started_notify = Arc::new(Notify::new());
841 let test_expr = NonStrictExpression::for_test(FirstProjectExprWaitsForStartedCount {
842 started_count: started_count.clone(),
843 started_notify: started_notify.clone(),
844 unblock_first_at_started_count: 3,
845 });
846
847 let proj = ProjectExecutor::new(
848 actor_context_with_project_expr_limits(3, 2),
849 source,
850 vec![test_expr],
851 MultiMap::from_iter(vec![(0, 0)].into_iter()),
852 vec![],
853 false,
854 );
855 let mut proj = proj.boxed().execute();
856
857 tx.push_barrier(test_epoch(1), false);
858 proj.expect_barrier().await;
859
860 tx.push_chunk(StreamChunk::from_pretty(
861 " I
862 + 1",
863 ));
864 tx.push_chunk(StreamChunk::from_pretty(
865 " I
866 + 2",
867 ));
868 tx.push_int64_watermark(0, 100);
869 tx.push_chunk(StreamChunk::from_pretty(
870 " I
871 + 3",
872 ));
873 tx.push_barrier(test_epoch(2), true);
874
875 let first_msg = timeout(Duration::from_secs(5), async {
876 let next_msg = proj.next();
877 pin_mut!(next_msg);
878 let mut first_msg = None;
879 loop {
880 let notified = started_notify.notified();
881 if started_count.load(atomic::Ordering::SeqCst) >= 3 {
882 break;
883 }
884 tokio::select! {
885 _ = notified => {}
886 msg = &mut next_msg => {
887 if started_count.load(atomic::Ordering::SeqCst) < 3 {
888 panic!("project executor emitted before the post-watermark chunk started: {msg:?}");
889 }
890 first_msg = Some(msg.unwrap().unwrap());
891 break;
892 }
893 }
894 }
895 match first_msg {
896 Some(msg) => msg,
897 None => next_msg.await.unwrap().unwrap(),
898 }
899 })
900 .await
901 .expect("post-watermark chunk expression did not start");
902
903 assert_eq!(
904 *first_msg.as_chunk().unwrap(),
905 StreamChunk::from_pretty(
906 " I
907 + 0"
908 )
909 );
910 assert_eq!(
911 *proj.next().await.unwrap().unwrap().as_chunk().unwrap(),
912 StreamChunk::from_pretty(
913 " I
914 + 1"
915 )
916 );
917 assert_eq!(
918 proj.expect_watermark().await,
919 Watermark {
920 col_idx: 0,
921 data_type: DataType::Int64,
922 val: ScalarImpl::Int64(0)
923 }
924 );
925 assert_eq!(
926 *proj.next().await.unwrap().unwrap().as_chunk().unwrap(),
927 StreamChunk::from_pretty(
928 " I
929 + 2"
930 )
931 );
932 assert!(proj.next().await.unwrap().unwrap().is_stop());
933 }
934
935 static DUMMY_COUNTER: AtomicI64 = AtomicI64::new(0);
936
937 #[derive(Debug)]
938 struct DummyNondecreasingExpr;
939
940 #[async_trait::async_trait]
941 impl Expression for DummyNondecreasingExpr {
942 fn return_type(&self) -> DataType {
943 DataType::Int64
944 }
945
946 async fn eval_v2(&self, input: &DataChunk) -> expr::Result<ValueImpl> {
947 let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
948 Ok(ValueImpl::Scalar {
949 value: Some(value.into()),
950 capacity: input.capacity(),
951 })
952 }
953
954 async fn eval_row(&self, _input: &OwnedRow) -> expr::Result<Datum> {
955 let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
956 Ok(Some(value.into()))
957 }
958 }
959
960 #[tokio::test]
961 async fn test_watermark_projection() {
962 let schema = Schema {
963 fields: vec![
964 Field::unnamed(DataType::Int64),
965 Field::unnamed(DataType::Int64),
966 ],
967 };
968 let (mut tx, source) = MockSource::channel();
969 let source = source.into_executor(schema, StreamKey::new());
970
971 let a_expr = build_from_pretty("(add:int8 $0:int8 1:int8)");
972 let b_expr = build_from_pretty("(subtract:int8 $0:int8 1:int8)");
973 let c_expr = NonStrictExpression::for_test(DummyNondecreasingExpr);
974
975 let proj = ProjectExecutor::new(
976 ActorContext::for_test(123),
977 source,
978 vec![a_expr, b_expr, c_expr],
979 MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()),
980 vec![2],
981 false,
982 );
983 let mut proj = proj.boxed().execute();
984
985 tx.push_barrier(test_epoch(1), false);
986 tx.push_int64_watermark(0, 100);
987
988 proj.expect_barrier().await;
989 let w1 = proj.expect_watermark().await;
990 let w2 = proj.expect_watermark().await;
991 let (w1, w2) = if w1.col_idx < w2.col_idx {
992 (w1, w2)
993 } else {
994 (w2, w1)
995 };
996
997 assert_eq!(
998 w1,
999 Watermark {
1000 col_idx: 0,
1001 data_type: DataType::Int64,
1002 val: ScalarImpl::Int64(101)
1003 }
1004 );
1005 assert_eq!(
1006 w2,
1007 Watermark {
1008 col_idx: 1,
1009 data_type: DataType::Int64,
1010 val: ScalarImpl::Int64(99)
1011 }
1012 );
1013
1014 tx.push_chunk(StreamChunk::from_pretty(
1016 " I I
1017 + 120 4
1018 + 146 5
1019 + 133 6",
1020 ));
1021 proj.expect_chunk().await;
1022 tx.push_chunk(StreamChunk::from_pretty(
1023 " I I
1024 + 213 8
1025 - 133 6",
1026 ));
1027 proj.expect_chunk().await;
1028
1029 tx.push_barrier(test_epoch(2), false);
1030 let w3 = proj.expect_watermark().await;
1031 proj.expect_barrier().await;
1032
1033 tx.push_chunk(StreamChunk::from_pretty(
1034 " I I
1035 + 100 3
1036 + 104 5
1037 + 187 3",
1038 ));
1039 proj.expect_chunk().await;
1040
1041 tx.push_barrier(test_epoch(3), false);
1042 let w4 = proj.expect_watermark().await;
1043 proj.expect_barrier().await;
1044
1045 assert_eq!(w3.col_idx, w4.col_idx);
1046 assert!(w3.val.default_cmp(&w4.val).is_le());
1047
1048 tx.push_int64_watermark(1, 100);
1049 tx.push_barrier(test_epoch(4), true);
1050
1051 assert!(proj.next().await.unwrap().unwrap().is_stop());
1052 }
1053}