Skip to main content

risingwave_stream/executor/project/
project_scalar.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use futures::future::{Either, pending, select};
18use futures::pin_mut;
19use futures::stream::FuturesOrdered;
20use multimap::MultiMap;
21use risingwave_common::metrics::LabelGuardedIntGauge;
22use risingwave_common::row::RowExt;
23use risingwave_common::types::ToOwnedDatum;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_expr::expr::NonStrictExpression;
26use tokio::sync::Semaphore;
27
28use crate::executor::prelude::*;
29
30type ProjectMessageFuture = impl Future<Output = StreamExecutorResult<ProjectedMessage>> + Send;
31type PendingProjectMessages = FuturesOrdered<ProjectMessageFuture>;
32
33enum ProjectMessageInput {
34    Chunk {
35        chunk: StreamChunk,
36        inflight_request_semaphore: Option<Arc<Semaphore>>,
37    },
38    Watermark {
39        watermark: Watermark,
40        out_col_indices: Vec<usize>,
41    },
42    Barrier(Barrier),
43}
44
45enum ProjectedMessage {
46    Chunk(StreamChunk),
47    Watermark(Vec<Watermark>),
48    Barrier(Barrier),
49}
50
51/// `ProjectExecutor` project data with the `expr`. The `expr` takes a chunk of data,
52/// and returns a new data chunk. And then, `ProjectExecutor` will insert, delete
53/// or update element into next operator according to the result of the expression.
54pub struct ProjectExecutor {
55    input: Executor,
56    inner: Inner,
57}
58
59struct Inner {
60    actor_id: ActorId,
61
62    /// Expressions of the current projection.
63    exprs: Arc<Vec<NonStrictExpression>>,
64    /// All the watermark derivations, (`input_column_index`, `output_column_index`). And the
65    /// derivation expression is the project's expression itself.
66    watermark_derivations: MultiMap<usize, usize>,
67    /// Indices of nondecreasing expressions in the expression list.
68    nondecreasing_expr_indices: Vec<usize>,
69    /// Last seen values of nondecreasing expressions, buffered to periodically produce watermarks.
70    last_nondec_expr_values: Vec<Option<ScalarImpl>>,
71
72    /// Whether there are likely no-op updates in the output chunks, so that eliminating them with
73    /// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial.
74    eliminate_noop_updates: bool,
75
76    /// Maximum number of chunks with pending projection evaluation.
77    project_expr_concurrency: usize,
78
79    /// Limits the number of chunks whose projection evaluation has started but not finished.
80    /// `None` means unlimited in-flight requests.
81    project_expr_inflight_request_concurrency: Option<Arc<Semaphore>>,
82
83    /// Number of messages waiting in the ordered projection window.
84    project_expr_inflight_window_size: Option<LabelGuardedIntGauge>,
85}
86
87impl ProjectExecutor {
88    pub fn new(
89        ctx: ActorContextRef,
90        input: Executor,
91        exprs: Vec<NonStrictExpression>,
92        watermark_derivations: MultiMap<usize, usize>,
93        nondecreasing_expr_indices: Vec<usize>,
94        noop_update_hint: bool,
95    ) -> Self {
96        let n_nondecreasing_exprs = nondecreasing_expr_indices.len();
97        let eliminate_noop_updates =
98            noop_update_hint || ctx.config.developer.aggressive_noop_update_elimination;
99        let project_expr_concurrency = match ctx.config.developer.project_expr_concurrency {
100            0 => usize::MAX,
101            concurrency => concurrency,
102        };
103        let project_expr_inflight_request_concurrency = match ctx
104            .config
105            .developer
106            .project_expr_inflight_request_concurrency
107        {
108            0 => None,
109            concurrency => Some(Arc::new(Semaphore::new(concurrency))),
110        };
111        let project_expr_inflight_window_size = (ctx.config.developer.project_expr_concurrency
112            != risingwave_common::config::default::developer::stream_project_expr_concurrency())
113        .then(|| {
114            ctx.streaming_metrics
115                .project_expr_inflight_window_size
116                .with_guarded_label_values(&[&ctx.id.to_string(), &ctx.fragment_id.to_string()])
117        });
118        Self {
119            input,
120            inner: Inner {
121                actor_id: ctx.id,
122                exprs: Arc::new(exprs),
123                watermark_derivations,
124                nondecreasing_expr_indices,
125                last_nondec_expr_values: vec![None; n_nondecreasing_exprs],
126                eliminate_noop_updates,
127                project_expr_concurrency,
128                project_expr_inflight_request_concurrency,
129                project_expr_inflight_window_size,
130            },
131        }
132    }
133}
134
135impl Debug for ProjectExecutor {
136    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137        f.debug_struct("ProjectExecutor")
138            .field("exprs", &self.inner.exprs)
139            .finish()
140    }
141}
142
143impl Execute for ProjectExecutor {
144    fn execute(self: Box<Self>) -> BoxedMessageStream {
145        self.inner.execute(self.input).boxed()
146    }
147}
148
149pub async fn apply_project_exprs(
150    exprs: &[NonStrictExpression],
151    chunk: StreamChunk,
152) -> StreamExecutorResult<StreamChunk> {
153    let (data_chunk, ops) = chunk.into_parts();
154    let mut projected_columns = Vec::new();
155
156    for expr in exprs {
157        let evaluated_expr = expr.eval_infallible(&data_chunk).await;
158        projected_columns.push(evaluated_expr);
159    }
160    let (_, vis) = data_chunk.into_parts();
161
162    let new_chunk = StreamChunk::with_visibility(ops, projected_columns, vis);
163
164    Ok(new_chunk)
165}
166
167impl Inner {
168    #[define_opaque(ProjectMessageFuture)]
169    fn project_message(
170        exprs: Arc<Vec<NonStrictExpression>>,
171        eliminate_noop_updates: bool,
172        input: ProjectMessageInput,
173    ) -> ProjectMessageFuture {
174        async move {
175            match input {
176                ProjectMessageInput::Chunk {
177                    chunk,
178                    inflight_request_semaphore,
179                } => {
180                    let _permit = if let Some(semaphore) = inflight_request_semaphore {
181                        Some(semaphore.acquire_owned().await.expect(
182                            "project expression in-flight request semaphore should not be closed",
183                        ))
184                    } else {
185                        None
186                    };
187                    let mut new_chunk = apply_project_exprs(&exprs, chunk)
188                        .instrument_await("project_eval_chunk")
189                        .await?;
190                    if eliminate_noop_updates {
191                        new_chunk = new_chunk.eliminate_adjacent_noop_update();
192                    }
193                    Ok(ProjectedMessage::Chunk(new_chunk))
194                }
195                ProjectMessageInput::Watermark {
196                    watermark,
197                    out_col_indices,
198                } => {
199                    let mut ret = vec![];
200                    for out_col_idx in out_col_indices {
201                        let derived_watermark = watermark
202                            .clone()
203                            .transform_with_expr(&exprs[out_col_idx], out_col_idx)
204                            .await;
205                        if let Some(derived_watermark) = derived_watermark {
206                            ret.push(derived_watermark);
207                        } else {
208                            warn!(
209                                "a NULL watermark is derived with the expression {}!",
210                                out_col_idx
211                            );
212                        }
213                    }
214                    Ok(ProjectedMessage::Watermark(ret))
215                }
216                ProjectMessageInput::Barrier(barrier) => Ok(ProjectedMessage::Barrier(barrier)),
217            }
218        }
219    }
220
221    fn update_last_nondec_expr_values(
222        nondecreasing_expr_indices: &[usize],
223        last_nondec_expr_values: &mut [Option<ScalarImpl>],
224        new_chunk: &StreamChunk,
225    ) {
226        {
227            {
228                {
229                    {
230                        if !nondecreasing_expr_indices.is_empty()
231                            && let Some((_, first_visible_row)) = new_chunk.rows().next()
232                        {
233                            // it's ok to use the first row here, just one chunk delay
234                            first_visible_row
235                                .project(nondecreasing_expr_indices)
236                                .iter()
237                                .enumerate()
238                                .for_each(|(idx, value)| {
239                                    last_nondec_expr_values[idx] =
240                                        Some(value.to_owned_datum().expect(
241                                            "non-decreasing expression should never be NULL",
242                                        ));
243                                });
244                        }
245                    }
246                }
247            }
248        }
249    }
250
251    #[try_stream(ok = Message, error = StreamExecutorError)]
252    async fn execute(self, input: Executor) {
253        let Inner {
254            actor_id,
255            exprs,
256            watermark_derivations,
257            nondecreasing_expr_indices,
258            mut last_nondec_expr_values,
259            eliminate_noop_updates,
260            project_expr_concurrency,
261            project_expr_inflight_request_concurrency,
262            project_expr_inflight_window_size,
263        } = self;
264
265        let mut input = input.execute();
266        let first_barrier = expect_first_barrier(&mut input).await?;
267        let mut is_paused = first_barrier.is_pause_on_startup();
268        let mut received_stop_barrier = first_barrier.is_stop(actor_id);
269        yield Message::Barrier(first_barrier);
270        if received_stop_barrier {
271            return Ok(());
272        }
273
274        let mut pending_project_messages = PendingProjectMessages::new();
275        let mut pending_project_chunks = 0;
276
277        loop {
278            let has_pending_project_message = !pending_project_messages.is_empty();
279            let can_read_input =
280                !received_stop_barrier && pending_project_chunks < project_expr_concurrency;
281            let next_projected_message = async {
282                if has_pending_project_message {
283                    pending_project_messages
284                        .next()
285                        .await
286                        .expect("pending project messages should not be empty")
287                } else {
288                    pending().await
289                }
290            };
291            let next_input_msg = async {
292                if can_read_input {
293                    input.next().await.ok_or_else(|| {
294                        StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
295                    })?
296                } else {
297                    pending().await
298                }
299            };
300
301            pin_mut!(next_projected_message);
302            pin_mut!(next_input_msg);
303
304            match select(next_projected_message, next_input_msg).await {
305                Either::Left((projected_message, _)) => match projected_message? {
306                    ProjectedMessage::Chunk(new_chunk) => {
307                        pending_project_chunks -= 1;
308                        if let Some(metric) = &project_expr_inflight_window_size {
309                            metric.set(pending_project_messages.len() as _);
310                        }
311                        Self::update_last_nondec_expr_values(
312                            &nondecreasing_expr_indices,
313                            &mut last_nondec_expr_values,
314                            &new_chunk,
315                        );
316                        yield Message::Chunk(new_chunk);
317                    }
318                    ProjectedMessage::Watermark(watermarks) => {
319                        if let Some(metric) = &project_expr_inflight_window_size {
320                            metric.set(pending_project_messages.len() as _);
321                        }
322                        for watermark in watermarks {
323                            yield Message::Watermark(watermark);
324                        }
325                    }
326                    ProjectedMessage::Barrier(barrier) => {
327                        if let Some(metric) = &project_expr_inflight_window_size {
328                            metric.set(pending_project_messages.len() as _);
329                        }
330                        if !is_paused {
331                            for (&expr_idx, value) in nondecreasing_expr_indices
332                                .iter()
333                                .zip_eq_fast(&mut last_nondec_expr_values)
334                            {
335                                if let Some(value) = std::mem::take(value) {
336                                    yield Message::Watermark(Watermark::new(
337                                        expr_idx,
338                                        exprs[expr_idx].return_type(),
339                                        value,
340                                    ))
341                                }
342                            }
343                        }
344
345                        if let Some(mutation) = barrier.mutation.as_deref() {
346                            match mutation {
347                                Mutation::Pause => {
348                                    is_paused = true;
349                                }
350                                Mutation::Resume => {
351                                    is_paused = false;
352                                }
353                                _ => (),
354                            }
355                        }
356
357                        let should_stop = barrier.is_stop(actor_id);
358                        yield Message::Barrier(barrier);
359                        if should_stop {
360                            break;
361                        }
362                    }
363                },
364                Either::Right((msg, _)) => match msg? {
365                    Message::Watermark(w) => {
366                        let out_col_indices = match watermark_derivations.get_vec(&w.col_idx) {
367                            Some(v) => v,
368                            None => continue,
369                        };
370                        pending_project_messages.push_back(Self::project_message(
371                            exprs.clone(),
372                            eliminate_noop_updates,
373                            ProjectMessageInput::Watermark {
374                                watermark: w,
375                                out_col_indices: out_col_indices.clone(),
376                            },
377                        ));
378                        if let Some(metric) = &project_expr_inflight_window_size {
379                            metric.set(pending_project_messages.len() as _);
380                        }
381                    }
382                    Message::Chunk(chunk) => {
383                        pending_project_messages.push_back(Self::project_message(
384                            exprs.clone(),
385                            eliminate_noop_updates,
386                            ProjectMessageInput::Chunk {
387                                chunk,
388                                inflight_request_semaphore:
389                                    project_expr_inflight_request_concurrency.clone(),
390                            },
391                        ));
392                        pending_project_chunks += 1;
393                        if let Some(metric) = &project_expr_inflight_window_size {
394                            metric.set(pending_project_messages.len() as _);
395                        }
396                    }
397                    Message::Barrier(barrier) => {
398                        if barrier.is_stop(actor_id) {
399                            received_stop_barrier = true;
400                        }
401                        pending_project_messages.push_back(Self::project_message(
402                            exprs.clone(),
403                            eliminate_noop_updates,
404                            ProjectMessageInput::Barrier(barrier),
405                        ));
406                        if let Some(metric) = &project_expr_inflight_window_size {
407                            metric.set(pending_project_messages.len() as _);
408                        }
409                    }
410                },
411            }
412        }
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicUsize};
419    use std::time::Duration;
420
421    use risingwave_common::array::DataChunk;
422    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
423    use risingwave_common::catalog::Field;
424    use risingwave_common::config::StreamingConfig;
425    use risingwave_common::types::DefaultOrd;
426    use risingwave_common::util::epoch::test_epoch;
427    use risingwave_expr::expr::{self, Expression, ValueImpl};
428    use tokio::sync::Notify;
429    use tokio::time::timeout;
430
431    use super::*;
432    use crate::executor::StopMutation;
433    use crate::executor::test_utils::expr::build_from_pretty;
434    use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
435
436    fn actor_context_with_project_expr_concurrency(concurrency: usize) -> ActorContextRef {
437        actor_context_with_project_expr_limits(concurrency, 0)
438    }
439
440    fn actor_context_with_project_expr_limits(
441        concurrency: usize,
442        inflight_request_concurrency: usize,
443    ) -> ActorContextRef {
444        let mut config = StreamingConfig::default();
445        config.developer.project_expr_concurrency = concurrency;
446        config.developer.project_expr_inflight_request_concurrency = inflight_request_concurrency;
447        let mut ctx = ActorContext::for_test(123);
448        Arc::get_mut(&mut ctx)
449            .expect("test actor context should not be shared")
450            .config = Arc::new(config);
451        ctx
452    }
453
454    #[tokio::test]
455    async fn test_projection() {
456        let chunk1 = StreamChunk::from_pretty(
457            " I I
458            + 1 4
459            + 2 5
460            + 3 6",
461        );
462        let chunk2 = StreamChunk::from_pretty(
463            " I I
464            + 7 8
465            - 3 6",
466        );
467        let schema = Schema {
468            fields: vec![
469                Field::unnamed(DataType::Int64),
470                Field::unnamed(DataType::Int64),
471            ],
472        };
473        let stream_key = vec![0];
474        let (mut tx, source) = MockSource::channel();
475        let source = source.into_executor(schema, stream_key);
476
477        let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)");
478
479        let proj = ProjectExecutor::new(
480            ActorContext::for_test(123),
481            source,
482            vec![test_expr],
483            MultiMap::new(),
484            vec![],
485            false,
486        );
487        let mut proj = proj.boxed().execute();
488
489        tx.push_barrier(test_epoch(1), false);
490        let barrier = proj.next().await.unwrap().unwrap();
491        barrier.as_barrier().unwrap();
492
493        tx.push_chunk(chunk1);
494        tx.push_chunk(chunk2);
495
496        let msg = proj.next().await.unwrap().unwrap();
497        assert_eq!(
498            *msg.as_chunk().unwrap(),
499            StreamChunk::from_pretty(
500                " I
501                + 5
502                + 7
503                + 9"
504            )
505        );
506
507        let msg = proj.next().await.unwrap().unwrap();
508        assert_eq!(
509            *msg.as_chunk().unwrap(),
510            StreamChunk::from_pretty(
511                "  I
512                + 15
513                -  9"
514            )
515        );
516
517        tx.push_barrier(test_epoch(2), true);
518        assert!(proj.next().await.unwrap().unwrap().is_stop());
519    }
520
521    #[tokio::test]
522    async fn test_projection_does_not_poll_after_stop_barrier() {
523        let schema = Schema {
524            fields: vec![Field::unnamed(DataType::Int64)],
525        };
526        let (mut tx, source) = MockSource::channel();
527        let source = source.into_executor(schema, StreamKey::new());
528
529        let test_expr = build_from_pretty("(add:int8 $0:int8 1:int8)");
530
531        let proj = ProjectExecutor::new(
532            ActorContext::for_test(123),
533            source,
534            vec![test_expr],
535            MultiMap::new(),
536            vec![],
537            false,
538        );
539        let mut proj = proj.boxed().execute();
540
541        tx.push_barrier(test_epoch(1), false);
542        proj.expect_barrier().await;
543
544        tx.send_barrier(
545            Barrier::new_test_barrier(test_epoch(2)).with_mutation(Mutation::Stop(StopMutation {
546                dropped_actors: std::iter::once(123.into()).collect(),
547                ..Default::default()
548            })),
549        );
550        tx.push_chunk(StreamChunk::from_pretty(
551            " I
552            + 1",
553        ));
554
555        assert!(proj.next().await.unwrap().unwrap().is_stop());
556        assert!(proj.next().await.is_none());
557    }
558
559    #[derive(Debug)]
560    struct BlockingProjectExpr {
561        started_count: Arc<AtomicUsize>,
562        second_started: Arc<AtomicBool>,
563        second_started_notify: Arc<Notify>,
564        release_first: Arc<AtomicBool>,
565        release_first_notify: Arc<Notify>,
566    }
567
568    #[async_trait::async_trait]
569    impl Expression for BlockingProjectExpr {
570        fn return_type(&self) -> DataType {
571            DataType::Int64
572        }
573
574        async fn eval_v2(&self, input: &DataChunk) -> expr::Result<ValueImpl> {
575            let call_idx = self.started_count.fetch_add(1, atomic::Ordering::SeqCst);
576            if call_idx == 0 {
577                loop {
578                    let notified = self.second_started_notify.notified();
579                    if self.second_started.load(atomic::Ordering::SeqCst) {
580                        break;
581                    }
582                    notified.await;
583                }
584                loop {
585                    let notified = self.release_first_notify.notified();
586                    if self.release_first.load(atomic::Ordering::SeqCst) {
587                        break;
588                    }
589                    notified.await;
590                }
591            } else if call_idx == 1 {
592                self.second_started.store(true, atomic::Ordering::SeqCst);
593                self.second_started_notify.notify_waiters();
594            }
595
596            Ok(ValueImpl::Scalar {
597                value: Some((call_idx as i64).into()),
598                capacity: input.capacity(),
599            })
600        }
601
602        async fn eval_row(&self, _input: &OwnedRow) -> expr::Result<Datum> {
603            unimplemented!()
604        }
605    }
606
607    #[derive(Debug)]
608    struct FirstProjectExprWaitsForStartedCount {
609        started_count: Arc<AtomicUsize>,
610        started_notify: Arc<Notify>,
611        unblock_first_at_started_count: usize,
612    }
613
614    #[async_trait::async_trait]
615    impl Expression for FirstProjectExprWaitsForStartedCount {
616        fn return_type(&self) -> DataType {
617            DataType::Int64
618        }
619
620        async fn eval_v2(&self, input: &DataChunk) -> expr::Result<ValueImpl> {
621            let call_idx = self.started_count.fetch_add(1, atomic::Ordering::SeqCst);
622            self.started_notify.notify_waiters();
623            if call_idx == 0 {
624                loop {
625                    let notified = self.started_notify.notified();
626                    if self.started_count.load(atomic::Ordering::SeqCst)
627                        >= self.unblock_first_at_started_count
628                    {
629                        break;
630                    }
631                    notified.await;
632                }
633            }
634
635            Ok(ValueImpl::Scalar {
636                value: Some((call_idx as i64).into()),
637                capacity: input.capacity(),
638            })
639        }
640
641        async fn eval_row(&self, _input: &OwnedRow) -> expr::Result<Datum> {
642            Ok(Some(0_i64.into()))
643        }
644    }
645
646    #[tokio::test]
647    async fn test_projection_evaluates_chunks_concurrently_before_barrier() {
648        let schema = Schema {
649            fields: vec![Field::unnamed(DataType::Int64)],
650        };
651        let (mut tx, source) = MockSource::channel();
652        let source = source.into_executor(schema, StreamKey::new());
653
654        let started_count = Arc::new(AtomicUsize::new(0));
655        let second_started = Arc::new(AtomicBool::new(false));
656        let second_started_notify = Arc::new(Notify::new());
657        let release_first = Arc::new(AtomicBool::new(false));
658        let release_first_notify = Arc::new(Notify::new());
659
660        let test_expr = NonStrictExpression::for_test(BlockingProjectExpr {
661            started_count,
662            second_started: second_started.clone(),
663            second_started_notify: second_started_notify.clone(),
664            release_first: release_first.clone(),
665            release_first_notify: release_first_notify.clone(),
666        });
667
668        let proj = ProjectExecutor::new(
669            actor_context_with_project_expr_concurrency(2),
670            source,
671            vec![test_expr],
672            MultiMap::new(),
673            vec![],
674            false,
675        );
676        let mut proj = proj.boxed().execute();
677
678        tx.push_barrier(test_epoch(1), false);
679        proj.expect_barrier().await;
680
681        tx.push_chunk(StreamChunk::from_pretty(
682            " I
683            + 1",
684        ));
685        tx.push_chunk(StreamChunk::from_pretty(
686            " I
687            + 2",
688        ));
689        tx.push_barrier(test_epoch(2), true);
690
691        let next_msg = proj.next();
692        pin_mut!(next_msg);
693        timeout(Duration::from_secs(5), async {
694            loop {
695                let notified = second_started_notify.notified();
696                if second_started.load(atomic::Ordering::SeqCst) {
697                    break;
698                }
699                tokio::select! {
700                    _ = notified => {}
701                    msg = &mut next_msg => {
702                        panic!("project executor emitted before the second chunk started: {msg:?}");
703                    }
704                }
705            }
706        })
707        .await
708        .expect("second chunk expression did not start");
709
710        release_first.store(true, atomic::Ordering::SeqCst);
711        release_first_notify.notify_waiters();
712
713        let msg = next_msg.await.unwrap().unwrap();
714        assert_eq!(
715            *msg.as_chunk().unwrap(),
716            StreamChunk::from_pretty(
717                " I
718                + 0"
719            )
720        );
721
722        let msg = proj.next().await.unwrap().unwrap();
723        assert_eq!(
724            *msg.as_chunk().unwrap(),
725            StreamChunk::from_pretty(
726                " I
727                + 1"
728            )
729        );
730
731        assert!(proj.next().await.unwrap().unwrap().is_stop());
732    }
733
734    #[tokio::test]
735    async fn test_projection_reuses_finished_inflight_permit_across_barrier() {
736        let schema = Schema {
737            fields: vec![Field::unnamed(DataType::Int64)],
738        };
739        let (mut tx, source) = MockSource::channel();
740        let source = source.into_executor(schema, StreamKey::new());
741
742        let started_count = Arc::new(AtomicUsize::new(0));
743        let started_notify = Arc::new(Notify::new());
744        let test_expr = NonStrictExpression::for_test(FirstProjectExprWaitsForStartedCount {
745            started_count: started_count.clone(),
746            started_notify: started_notify.clone(),
747            unblock_first_at_started_count: 3,
748        });
749
750        let proj = ProjectExecutor::new(
751            actor_context_with_project_expr_limits(3, 2),
752            source,
753            vec![test_expr],
754            MultiMap::new(),
755            vec![],
756            false,
757        );
758        let mut proj = proj.boxed().execute();
759
760        tx.push_barrier(test_epoch(1), false);
761        proj.expect_barrier().await;
762
763        tx.push_chunk(StreamChunk::from_pretty(
764            " I
765            + 1",
766        ));
767        tx.push_chunk(StreamChunk::from_pretty(
768            " I
769            + 2",
770        ));
771        tx.push_barrier(test_epoch(2), false);
772        tx.push_chunk(StreamChunk::from_pretty(
773            " I
774            + 3",
775        ));
776        tx.push_barrier(test_epoch(3), true);
777
778        let first_msg = timeout(Duration::from_secs(5), async {
779            let next_msg = proj.next();
780            pin_mut!(next_msg);
781            let mut first_msg = None;
782            loop {
783                let notified = started_notify.notified();
784                if started_count.load(atomic::Ordering::SeqCst) >= 3 {
785                    break;
786                }
787                tokio::select! {
788                    _ = notified => {}
789                    msg = &mut next_msg => {
790                        if started_count.load(atomic::Ordering::SeqCst) < 3 {
791                            panic!("project executor emitted before the post-barrier chunk started: {msg:?}");
792                        }
793                        first_msg = Some(msg.unwrap().unwrap());
794                        break;
795                    }
796                }
797            }
798            match first_msg {
799                Some(msg) => msg,
800                None => next_msg.await.unwrap().unwrap(),
801            }
802        })
803        .await
804        .expect("post-barrier chunk expression did not start");
805
806        assert_eq!(
807            *first_msg.as_chunk().unwrap(),
808            StreamChunk::from_pretty(
809                " I
810                + 0"
811            )
812        );
813        assert_eq!(
814            *proj.next().await.unwrap().unwrap().as_chunk().unwrap(),
815            StreamChunk::from_pretty(
816                " I
817                + 1"
818            )
819        );
820        proj.expect_barrier().await;
821        assert_eq!(
822            *proj.next().await.unwrap().unwrap().as_chunk().unwrap(),
823            StreamChunk::from_pretty(
824                " I
825                + 2"
826            )
827        );
828        assert!(proj.next().await.unwrap().unwrap().is_stop());
829    }
830
831    #[tokio::test]
832    async fn test_projection_reuses_finished_inflight_permit_across_watermark() {
833        let schema = Schema {
834            fields: vec![Field::unnamed(DataType::Int64)],
835        };
836        let (mut tx, source) = MockSource::channel();
837        let source = source.into_executor(schema, StreamKey::new());
838
839        let started_count = Arc::new(AtomicUsize::new(0));
840        let started_notify = Arc::new(Notify::new());
841        let test_expr = NonStrictExpression::for_test(FirstProjectExprWaitsForStartedCount {
842            started_count: started_count.clone(),
843            started_notify: started_notify.clone(),
844            unblock_first_at_started_count: 3,
845        });
846
847        let proj = ProjectExecutor::new(
848            actor_context_with_project_expr_limits(3, 2),
849            source,
850            vec![test_expr],
851            MultiMap::from_iter(vec![(0, 0)].into_iter()),
852            vec![],
853            false,
854        );
855        let mut proj = proj.boxed().execute();
856
857        tx.push_barrier(test_epoch(1), false);
858        proj.expect_barrier().await;
859
860        tx.push_chunk(StreamChunk::from_pretty(
861            " I
862            + 1",
863        ));
864        tx.push_chunk(StreamChunk::from_pretty(
865            " I
866            + 2",
867        ));
868        tx.push_int64_watermark(0, 100);
869        tx.push_chunk(StreamChunk::from_pretty(
870            " I
871            + 3",
872        ));
873        tx.push_barrier(test_epoch(2), true);
874
875        let first_msg = timeout(Duration::from_secs(5), async {
876            let next_msg = proj.next();
877            pin_mut!(next_msg);
878            let mut first_msg = None;
879            loop {
880                let notified = started_notify.notified();
881                if started_count.load(atomic::Ordering::SeqCst) >= 3 {
882                    break;
883                }
884                tokio::select! {
885                    _ = notified => {}
886                    msg = &mut next_msg => {
887                        if started_count.load(atomic::Ordering::SeqCst) < 3 {
888                            panic!("project executor emitted before the post-watermark chunk started: {msg:?}");
889                        }
890                        first_msg = Some(msg.unwrap().unwrap());
891                        break;
892                    }
893                }
894            }
895            match first_msg {
896                Some(msg) => msg,
897                None => next_msg.await.unwrap().unwrap(),
898            }
899        })
900        .await
901        .expect("post-watermark chunk expression did not start");
902
903        assert_eq!(
904            *first_msg.as_chunk().unwrap(),
905            StreamChunk::from_pretty(
906                " I
907                + 0"
908            )
909        );
910        assert_eq!(
911            *proj.next().await.unwrap().unwrap().as_chunk().unwrap(),
912            StreamChunk::from_pretty(
913                " I
914                + 1"
915            )
916        );
917        assert_eq!(
918            proj.expect_watermark().await,
919            Watermark {
920                col_idx: 0,
921                data_type: DataType::Int64,
922                val: ScalarImpl::Int64(0)
923            }
924        );
925        assert_eq!(
926            *proj.next().await.unwrap().unwrap().as_chunk().unwrap(),
927            StreamChunk::from_pretty(
928                " I
929                + 2"
930            )
931        );
932        assert!(proj.next().await.unwrap().unwrap().is_stop());
933    }
934
935    static DUMMY_COUNTER: AtomicI64 = AtomicI64::new(0);
936
937    #[derive(Debug)]
938    struct DummyNondecreasingExpr;
939
940    #[async_trait::async_trait]
941    impl Expression for DummyNondecreasingExpr {
942        fn return_type(&self) -> DataType {
943            DataType::Int64
944        }
945
946        async fn eval_v2(&self, input: &DataChunk) -> expr::Result<ValueImpl> {
947            let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
948            Ok(ValueImpl::Scalar {
949                value: Some(value.into()),
950                capacity: input.capacity(),
951            })
952        }
953
954        async fn eval_row(&self, _input: &OwnedRow) -> expr::Result<Datum> {
955            let value = DUMMY_COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
956            Ok(Some(value.into()))
957        }
958    }
959
960    #[tokio::test]
961    async fn test_watermark_projection() {
962        let schema = Schema {
963            fields: vec![
964                Field::unnamed(DataType::Int64),
965                Field::unnamed(DataType::Int64),
966            ],
967        };
968        let (mut tx, source) = MockSource::channel();
969        let source = source.into_executor(schema, StreamKey::new());
970
971        let a_expr = build_from_pretty("(add:int8 $0:int8 1:int8)");
972        let b_expr = build_from_pretty("(subtract:int8 $0:int8 1:int8)");
973        let c_expr = NonStrictExpression::for_test(DummyNondecreasingExpr);
974
975        let proj = ProjectExecutor::new(
976            ActorContext::for_test(123),
977            source,
978            vec![a_expr, b_expr, c_expr],
979            MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()),
980            vec![2],
981            false,
982        );
983        let mut proj = proj.boxed().execute();
984
985        tx.push_barrier(test_epoch(1), false);
986        tx.push_int64_watermark(0, 100);
987
988        proj.expect_barrier().await;
989        let w1 = proj.expect_watermark().await;
990        let w2 = proj.expect_watermark().await;
991        let (w1, w2) = if w1.col_idx < w2.col_idx {
992            (w1, w2)
993        } else {
994            (w2, w1)
995        };
996
997        assert_eq!(
998            w1,
999            Watermark {
1000                col_idx: 0,
1001                data_type: DataType::Int64,
1002                val: ScalarImpl::Int64(101)
1003            }
1004        );
1005        assert_eq!(
1006            w2,
1007            Watermark {
1008                col_idx: 1,
1009                data_type: DataType::Int64,
1010                val: ScalarImpl::Int64(99)
1011            }
1012        );
1013
1014        // just push some random chunks
1015        tx.push_chunk(StreamChunk::from_pretty(
1016            "   I I
1017            + 120 4
1018            + 146 5
1019            + 133 6",
1020        ));
1021        proj.expect_chunk().await;
1022        tx.push_chunk(StreamChunk::from_pretty(
1023            "   I I
1024            + 213 8
1025            - 133 6",
1026        ));
1027        proj.expect_chunk().await;
1028
1029        tx.push_barrier(test_epoch(2), false);
1030        let w3 = proj.expect_watermark().await;
1031        proj.expect_barrier().await;
1032
1033        tx.push_chunk(StreamChunk::from_pretty(
1034            "   I I
1035            + 100 3
1036            + 104 5
1037            + 187 3",
1038        ));
1039        proj.expect_chunk().await;
1040
1041        tx.push_barrier(test_epoch(3), false);
1042        let w4 = proj.expect_watermark().await;
1043        proj.expect_barrier().await;
1044
1045        assert_eq!(w3.col_idx, w4.col_idx);
1046        assert!(w3.val.default_cmp(&w4.val).is_le());
1047
1048        tx.push_int64_watermark(1, 100);
1049        tx.push_barrier(test_epoch(4), true);
1050
1051        assert!(proj.next().await.unwrap().unwrap().is_stop());
1052    }
1053}