risingwave_stream/executor/
upstream_sink_union.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16use std::iter;
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20use anyhow::Context as _;
21use futures::future::try_join_all;
22use itertools::Itertools;
23use pin_project::pin_project;
24use risingwave_common::catalog::Field;
25use risingwave_expr::expr::{EvalErrorReport, NonStrictExpression, build_non_strict_from_prost};
26use risingwave_pb::common::PbActorInfo;
27use risingwave_pb::expr::PbExprNode;
28use risingwave_pb::plan_common::PbField;
29use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
30use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
31use rw_futures_util::pending_on_none;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33
34use crate::executor::exchange::input::{Input, assert_equal_dispatcher_barrier, new_input};
35use crate::executor::prelude::*;
36use crate::executor::project::apply_project_exprs;
37use crate::executor::{
38    BarrierMutationType, BoxedMessageInput, DynamicReceivers, MergeExecutor, Message,
39};
40use crate::task::{ActorEvalErrorReport, FragmentId, LocalBarrierManager};
41
42type ProcessedMessageStream = impl Stream<Item = MessageStreamItem>;
43
44/// A wrapper that merges data from a single upstream fragment and applies projection expressions.
45/// Each `SinkHandlerInput` represents one upstream fragment with its own merge executor and projection logic.
46#[pin_project]
47pub struct SinkHandlerInput {
48    /// The ID of the upstream fragment that this input is associated with.
49    upstream_fragment_id: FragmentId,
50
51    /// The stream of messages from the upstream fragment.
52    #[pin]
53    processed_stream: ProcessedMessageStream,
54}
55
56impl SinkHandlerInput {
57    pub fn new(
58        upstream_fragment_id: FragmentId,
59        merge: Box<MergeExecutor>,
60        project_exprs: Vec<NonStrictExpression>,
61    ) -> Self {
62        let processed_stream = Self::apply_project_exprs_stream(merge, project_exprs);
63        Self {
64            upstream_fragment_id,
65            processed_stream,
66        }
67    }
68
69    #[define_opaque(ProcessedMessageStream)]
70    fn apply_project_exprs_stream(
71        merge: Box<MergeExecutor>,
72        project_exprs: Vec<NonStrictExpression>,
73    ) -> ProcessedMessageStream {
74        // Apply the projection expressions to the output of the merge executor.
75        Self::apply_project_exprs_stream_impl(merge, project_exprs)
76    }
77
78    /// Applies a projection to the output of a merge executor.
79    #[try_stream(ok = Message, error = StreamExecutorError)]
80    async fn apply_project_exprs_stream_impl(
81        merge: Box<MergeExecutor>,
82        project_exprs: Vec<NonStrictExpression>,
83    ) {
84        let merge_stream = merge.execute_inner();
85        pin_mut!(merge_stream);
86        while let Some(msg) = merge_stream.next().await {
87            let msg = msg?;
88            if let Message::Chunk(chunk) = msg {
89                // Apply the projection expressions to the chunk.
90                let new_chunk = apply_project_exprs(&project_exprs, chunk).await?;
91                yield Message::Chunk(new_chunk);
92            } else {
93                yield msg;
94            }
95        }
96    }
97}
98
99impl Input for SinkHandlerInput {
100    type InputId = FragmentId;
101
102    fn id(&self) -> Self::InputId {
103        // Return a unique identifier for this input, e.g., based on the upstream fragment ID
104        self.upstream_fragment_id
105    }
106}
107
108impl Stream for SinkHandlerInput {
109    type Item = MessageStreamItem;
110
111    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112        self.project().processed_stream.poll_next(cx)
113    }
114}
115
116/// Information about an upstream fragment including its schema and projection expressions.
117#[derive(Debug)]
118pub struct UpstreamFragmentInfo {
119    pub upstream_fragment_id: FragmentId,
120    pub upstream_actors: Vec<PbActorInfo>,
121    pub merge_schema: Schema,
122    pub project_exprs: Vec<NonStrictExpression>,
123}
124
125impl UpstreamFragmentInfo {
126    pub fn new(
127        upstream_fragment_id: FragmentId,
128        initial_upstream_actors: &HashMap<FragmentId, UpstreamActors>,
129        sink_output_schema: &[PbField],
130        project_exprs: &[PbExprNode],
131        error_report: impl EvalErrorReport + 'static,
132    ) -> StreamResult<Self> {
133        let actors = initial_upstream_actors
134            .get(&upstream_fragment_id)
135            .ok_or_else(|| {
136                anyhow::anyhow!(
137                    "upstream fragment {} not found in initial upstream actors",
138                    upstream_fragment_id
139                )
140            })?;
141        let merge_schema = sink_output_schema.iter().map(Field::from).collect();
142        let project_exprs = project_exprs
143            .iter()
144            .map(|e| build_non_strict_from_prost(e, error_report.clone()))
145            .try_collect()
146            .map_err(|err| anyhow::anyhow!(err))?;
147        Ok(Self {
148            upstream_fragment_id,
149            upstream_actors: actors.actors.clone(),
150            merge_schema,
151            project_exprs,
152        })
153    }
154}
155
156type BoxedSinkInput = BoxedMessageInput<FragmentId, BarrierMutationType>;
157
158/// `UpstreamSinkUnionExecutor` merges data from multiple upstream fragments, where each fragment
159/// has its own merge logic and projection expressions. This executor is specifically designed for
160/// sink operations that need to union data from different upstream sources.
161///
162/// Unlike a simple union that just merges streams, this executor:
163/// 1. Creates a separate `MergeExecutor` for each upstream fragment
164/// 2. Applies fragment-specific projection expressions to each stream
165/// 3. Unions all the processed streams into a single output stream
166///
167/// This is useful for sink operators that need to collect data from multiple upstream fragments
168/// with potentially different schemas or processing requirements.
169pub struct UpstreamSinkUnionExecutor {
170    /// The context of the actor.
171    actor_context: ActorContextRef,
172
173    /// Streaming metrics.
174    executor_stats: Arc<StreamingMetrics>,
175
176    /// The initial inputs to the executor.
177    initial_inputs: Vec<BoxedSinkInput>,
178
179    /// For asynchronous processing barriers from `LocalBarrierManager`.
180    upstream_sink_barrier_mgr: UpstreamSinkBarrierManager,
181}
182
183impl Debug for UpstreamSinkUnionExecutor {
184    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
185        f.debug_struct("UpstreamSinkUnionExecutor")
186            .field(
187                "initial_upstream_fragments",
188                &self.initial_inputs.iter().map(|i| i.id()).collect_vec(),
189            )
190            .finish()
191    }
192}
193
194impl Execute for UpstreamSinkUnionExecutor {
195    fn execute(self: Box<Self>) -> BoxedMessageStream {
196        self.execute_inner().boxed()
197    }
198}
199
200struct PendingSinkBarrier {
201    barrier: Barrier,
202    new_input: Option<BoxedSinkInput>,
203    removed_upstream_fragment_ids: HashSet<FragmentId>,
204}
205
206struct BuildSinkInputContext {
207    /// The context of the actor.
208    actor_context: ActorContextRef,
209
210    /// Used to create merge executors.
211    local_barrier_manager: LocalBarrierManager,
212
213    /// Streaming metrics.
214    executor_stats: Arc<StreamingMetrics>,
215
216    /// The size of the chunks to be processed.
217    chunk_size: usize,
218
219    /// The error report for evaluation errors.
220    eval_error_report: ActorEvalErrorReport,
221}
222
223struct UpstreamSinkBarrierManager {
224    /// Context needed to build new sink inputs.
225    build_input_ctx: BuildSinkInputContext,
226
227    /// Used to get barriers directly from the `BarrierManager`.
228    barrier_rx: UnboundedReceiver<Barrier>,
229
230    /// Used to send barriers to the merge executors of upstream fragments.
231    barrier_tx_map: HashMap<FragmentId, UnboundedSender<Barrier>>,
232
233    /// Barriers that have been received from the `barrier_rx` but not yet fully processed.
234    pending_barriers: VecDeque<PendingSinkBarrier>,
235}
236
237impl UpstreamSinkUnionExecutor {
238    // Need to wait for establishing stream-connections to upstream actors, so async.
239    pub async fn new(
240        ctx: ActorContextRef,
241        local_barrier_manager: LocalBarrierManager,
242        executor_stats: Arc<StreamingMetrics>,
243        chunk_size: usize,
244        initial_upstream_infos: Vec<UpstreamFragmentInfo>,
245        eval_error_report: ActorEvalErrorReport,
246    ) -> StreamExecutorResult<Self> {
247        let barrier_rx = local_barrier_manager.subscribe_barrier(ctx.id);
248        let build_input_ctx = BuildSinkInputContext {
249            actor_context: ctx.clone(),
250            local_barrier_manager,
251            executor_stats: executor_stats.clone(),
252            chunk_size,
253            eval_error_report,
254        };
255        let mut upstream_sink_barrier_mgr =
256            UpstreamSinkBarrierManager::new(build_input_ctx, barrier_rx);
257
258        let mut initial_inputs = Vec::with_capacity(initial_upstream_infos.len());
259        for info in initial_upstream_infos {
260            let input = upstream_sink_barrier_mgr.new_sink_input_impl(info).await?;
261            initial_inputs.push(input);
262        }
263
264        let executor = Self {
265            actor_context: ctx,
266            executor_stats,
267            initial_inputs,
268            upstream_sink_barrier_mgr,
269        };
270
271        Ok(executor)
272    }
273
274    #[cfg(test)]
275    pub fn for_test(
276        actor_id: ActorId,
277        local_barrier_manager: LocalBarrierManager,
278        chunk_size: usize,
279        initial_inputs: Vec<BoxedSinkInput>,
280    ) -> Self {
281        let actor_ctx = ActorContext::for_test(actor_id);
282        let executor_stats: Arc<StreamingMetrics> = StreamingMetrics::unused().into();
283        let eval_error_report = ActorEvalErrorReport {
284            actor_context: actor_ctx.clone(),
285            identity: format!("UpstreamSinkUnionExecutor-{}", actor_id).into(),
286        };
287
288        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
289        let build_input_ctx = BuildSinkInputContext {
290            actor_context: actor_ctx.clone(),
291            local_barrier_manager,
292            executor_stats: executor_stats.clone(),
293            chunk_size,
294            eval_error_report,
295        };
296        let upstream_sink_barrier_mgr =
297            UpstreamSinkBarrierManager::new(build_input_ctx, barrier_rx);
298
299        Self {
300            actor_context: actor_ctx,
301            executor_stats,
302            initial_inputs,
303            upstream_sink_barrier_mgr,
304        }
305    }
306
307    #[try_stream(ok = Message, error = StreamExecutorError)]
308    async fn execute_inner(self: Box<Self>) {
309        let actor_id = self.actor_context.id;
310        let fragment_id = self.actor_context.fragment_id;
311
312        let barrier_align = self
313            .executor_stats
314            .barrier_align_duration
315            .with_guarded_label_values(&[
316                actor_id.to_string().as_str(),
317                fragment_id.to_string().as_str(),
318                "",
319                "UpstreamSinkUnion",
320            ]);
321
322        let upstreams =
323            DynamicReceivers::new(self.initial_inputs, Some(barrier_align.clone()), None);
324        pin_mut!(upstreams);
325
326        let mut upstream_sink_barrier_mgr = self.upstream_sink_barrier_mgr;
327
328        loop {
329            let msg = upstream_sink_barrier_mgr
330                .await_next_message(&mut upstreams)
331                .await?;
332
333            if let Message::Barrier(barrier) = &msg {
334                let pending_barrier = upstream_sink_barrier_mgr.pop_pending_barrier();
335                assert_equal_dispatcher_barrier(barrier, &pending_barrier.barrier);
336
337                if let Some(mut new_sink_input) = pending_barrier.new_input {
338                    let first_barrier = expect_first_barrier(&mut new_sink_input).await?;
339                    assert_equal_dispatcher_barrier(&pending_barrier.barrier, &first_barrier);
340                    upstreams.add_upstreams_from(iter::once(new_sink_input));
341                }
342
343                if !pending_barrier.removed_upstream_fragment_ids.is_empty() {
344                    upstreams.remove_upstreams(&pending_barrier.removed_upstream_fragment_ids);
345                }
346            }
347
348            yield msg;
349        }
350    }
351}
352
353impl UpstreamSinkBarrierManager {
354    fn new(build_input_ctx: BuildSinkInputContext, barrier_rx: UnboundedReceiver<Barrier>) -> Self {
355        Self {
356            build_input_ctx,
357            barrier_rx,
358            barrier_tx_map: HashMap::new(),
359            pending_barriers: VecDeque::new(),
360        }
361    }
362
363    async fn await_next_message(
364        &mut self,
365        upstreams: &mut DynamicReceivers<FragmentId, BarrierMutationType>,
366    ) -> StreamExecutorResult<Message> {
367        loop {
368            if upstreams.is_empty() && !self.pending_barriers.is_empty() {
369                let barrier = self.pending_barriers.front().unwrap().barrier.clone();
370                return Ok(Message::Barrier(barrier));
371            }
372            tokio::select! {
373                biased;
374
375                // If None is returned, it means upstreams is empty, which means we should continue pending and wait on
376                // the second branch.
377                msg = pending_on_none(upstreams.next()) => {
378                    return msg;
379                }
380
381                barrier = self.barrier_rx.recv() => {
382                    // Here, if there's no upstream, the cached barrier here will be sent out immediately in the next
383                    // loop. Otherwise, we need to forward the barrier to the current upstreams, cache this barrier, and
384                    // then wait in the first branch until the upstreams have processed the barrier.
385                    let barrier = barrier.context("Failed to receive barrier from barrier_rx")?;
386                    let pending_barrier = self.partially_apply_barrier(barrier).await?;
387                    self.pending_barriers.push_back(pending_barrier);
388                    continue;
389                }
390            }
391        }
392    }
393
394    async fn partially_apply_barrier(
395        &mut self,
396        barrier: Barrier,
397    ) -> StreamExecutorResult<PendingSinkBarrier> {
398        let new_sink_input = if let Some(new_upstream_sink) =
399            barrier.as_new_upstream_sink(self.build_input_ctx.actor_context.fragment_id)
400        {
401            // Create new inputs for the newly added upstream sinks.
402            let new_input = self.new_sink_input(new_upstream_sink).await?;
403            Some(new_input)
404        } else {
405            None
406        };
407
408        self.forward_barrier_to_all_upstreams(&barrier)?;
409
410        let removed_upstream_fragment_ids = if let Some(dropped_upstream_sinks) =
411            barrier.as_dropped_upstream_sinks()
412            && !dropped_upstream_sinks.is_empty()
413        {
414            // Remove the upstream sinks that are no longer needed.
415            self.barrier_tx_map
416                .extract_if(|upstream_fragment_id, _| {
417                    dropped_upstream_sinks.contains(upstream_fragment_id)
418                })
419                .map(|(upstream_fragment_id, _)| upstream_fragment_id)
420                .collect()
421        } else {
422            HashSet::new()
423        };
424
425        Ok(PendingSinkBarrier {
426            barrier,
427            new_input: new_sink_input,
428            removed_upstream_fragment_ids,
429        })
430    }
431
432    fn pop_pending_barrier(&mut self) -> PendingSinkBarrier {
433        self.pending_barriers
434            .pop_front()
435            .expect("should always have a pending barrier when receiving a barrier from upstreams")
436    }
437
438    fn forward_barrier_to_all_upstreams(&self, barrier: &Barrier) -> StreamExecutorResult<()> {
439        for tx in self.barrier_tx_map.values() {
440            tx.send(barrier.clone())
441                .map_err(|e| StreamExecutorError::from(anyhow::anyhow!(e)))?;
442        }
443        Ok(())
444    }
445
446    async fn new_sink_input(
447        &mut self,
448        pb_upstream_info: &PbNewUpstreamSink,
449    ) -> StreamExecutorResult<BoxedSinkInput> {
450        let info = pb_upstream_info.get_info().unwrap();
451        let merge_schema = info
452            .get_sink_output_schema()
453            .iter()
454            .map(Field::from)
455            .collect();
456        let project_exprs = info
457            .get_project_exprs()
458            .iter()
459            .map(|e| build_non_strict_from_prost(e, self.build_input_ctx.eval_error_report.clone()))
460            .try_collect()
461            .map_err(|err| anyhow::anyhow!(err))?;
462        let upstream_fragment_id = info.get_upstream_fragment_id();
463        self.new_sink_input_impl(UpstreamFragmentInfo {
464            upstream_fragment_id,
465            upstream_actors: pb_upstream_info.get_upstream_actors().clone(),
466            merge_schema,
467            project_exprs,
468        })
469        .await
470    }
471
472    async fn new_sink_input_impl(
473        &mut self,
474        UpstreamFragmentInfo {
475            upstream_fragment_id,
476            upstream_actors,
477            merge_schema,
478            project_exprs,
479        }: UpstreamFragmentInfo,
480    ) -> StreamExecutorResult<BoxedSinkInput> {
481        let merge_executor = self
482            .new_merge_executor(upstream_fragment_id, upstream_actors, merge_schema)
483            .await?;
484
485        Ok(SinkHandlerInput::new(
486            upstream_fragment_id,
487            Box::new(merge_executor),
488            project_exprs,
489        )
490        .boxed_input())
491    }
492
493    async fn new_merge_executor(
494        &mut self,
495        upstream_fragment_id: FragmentId,
496        upstream_actors: Vec<PbActorInfo>,
497        schema: Schema,
498    ) -> StreamExecutorResult<MergeExecutor> {
499        let ctx = &self.build_input_ctx;
500        let inputs = try_join_all(upstream_actors.iter().map(|actor| {
501            new_input(
502                &ctx.local_barrier_manager,
503                ctx.executor_stats.clone(),
504                ctx.actor_context.id,
505                ctx.actor_context.fragment_id,
506                actor,
507                upstream_fragment_id,
508            )
509        }))
510        .await?;
511
512        let (barrier_tx, barrier_rx) = unbounded_channel();
513        self.barrier_tx_map
514            .try_insert(upstream_fragment_id, barrier_tx)
515            .expect("non-duplicate");
516
517        let upstreams =
518            MergeExecutor::new_select_receiver(inputs, &ctx.executor_stats, &ctx.actor_context);
519
520        Ok(MergeExecutor::new(
521            ctx.actor_context.clone(),
522            ctx.actor_context.fragment_id,
523            upstream_fragment_id,
524            upstreams,
525            ctx.local_barrier_manager.clone(),
526            ctx.executor_stats.clone(),
527            barrier_rx,
528            ctx.chunk_size,
529            schema,
530        ))
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use std::collections::HashSet;
537
538    use futures::FutureExt;
539    use risingwave_common::array::{Op, StreamChunkTestExt};
540    use risingwave_common::catalog::Field;
541    use risingwave_common::util::epoch::test_epoch;
542    use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
543    use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
544
545    use super::*;
546    use crate::executor::exchange::permit::{Sender, channel_for_test};
547    use crate::executor::test_utils::expr::build_from_pretty;
548    use crate::executor::{AddMutation, MessageInner, StopMutation};
549    use crate::task::NewOutputRequest;
550    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
551    use crate::task::test_utils::helper_make_local_actor;
552
553    #[tokio::test]
554    async fn test_sink_input() {
555        let test_env = LocalBarrierTestEnv::for_test().await;
556
557        let actor_id = 2;
558
559        let b1 = Barrier::with_prev_epoch_for_test(2, 1);
560
561        test_env.inject_barrier(&b1, [actor_id]);
562        test_env.flush_all_events().await;
563
564        let schema = Schema {
565            fields: vec![
566                Field::unnamed(DataType::Int64),
567                Field::unnamed(DataType::Int64),
568            ],
569        };
570
571        let (tx1, rx1) = channel_for_test();
572        let (tx2, rx2) = channel_for_test();
573
574        let merge = MergeExecutor::for_test(
575            actor_id,
576            vec![rx1, rx2],
577            test_env.local_barrier_manager.clone(),
578            schema.clone(),
579            5,
580            None,
581        );
582
583        let test_expr = build_from_pretty("$1:int8");
584
585        let mut input = SinkHandlerInput::new(
586            1919, // from MergeExecutor::for_test()
587            Box::new(merge),
588            vec![test_expr],
589        )
590        .boxed_input();
591
592        let chunk1 = StreamChunk::from_pretty(
593            " I I
594            + 1 4
595            + 2 5
596            + 3 6",
597        );
598        let chunk2 = StreamChunk::from_pretty(
599            " I I
600            + 7 8
601            - 3 6",
602        );
603
604        tx1.send(MessageInner::Chunk(chunk1).into()).await.unwrap();
605        tx2.send(MessageInner::Chunk(chunk2).into()).await.unwrap();
606
607        let msg = input.next().await.unwrap().unwrap();
608        assert_eq!(
609            *msg.as_chunk().unwrap(),
610            StreamChunk::from_pretty(
611                " I
612                + 4
613                + 5
614                + 6
615                + 8
616                - 6"
617            )
618        );
619    }
620
621    fn new_input_for_test(
622        actor_id: ActorId,
623        local_barrier_manager: LocalBarrierManager,
624    ) -> (BoxedSinkInput, Sender, UnboundedSender<Barrier>) {
625        let (tx, rx) = channel_for_test();
626        let (barrier_tx, barrier_rx) = unbounded_channel();
627        let merge = MergeExecutor::for_test(
628            actor_id,
629            vec![rx],
630            local_barrier_manager,
631            Schema::new(vec![]),
632            10,
633            Some(barrier_rx),
634        );
635        let input = SinkHandlerInput::new(actor_id, Box::new(merge), vec![]).boxed_input();
636        (input, tx, barrier_tx)
637    }
638
639    fn build_test_chunk(size: u64) -> StreamChunk {
640        let ops = vec![Op::Insert; size as usize];
641        StreamChunk::new(ops, vec![])
642    }
643
644    #[tokio::test]
645    async fn test_fixed_upstreams() {
646        let test_env = LocalBarrierTestEnv::for_test().await;
647
648        let actor_id = 2;
649
650        let b1 = Barrier::with_prev_epoch_for_test(2, 1);
651
652        test_env.inject_barrier(&b1, [actor_id]);
653        test_env.flush_all_events().await;
654
655        let mut inputs = Vec::with_capacity(3);
656        let mut txs = Vec::with_capacity(3);
657        let mut barrier_txs = Vec::with_capacity(3);
658        for _ in 0..3 {
659            let (input, tx, barrier_tx) =
660                new_input_for_test(actor_id, test_env.local_barrier_manager.clone());
661            inputs.push(input);
662            txs.push(tx);
663            barrier_txs.push(barrier_tx);
664        }
665
666        let sink_union = UpstreamSinkUnionExecutor::for_test(
667            actor_id,
668            test_env.local_barrier_manager.clone(),
669            10,
670            inputs,
671        );
672        // Flush subscribe_barrier events to ensure the executor is ready.
673        test_env.flush_all_events().await;
674        let mut sink_union = Box::new(sink_union).execute_inner().boxed();
675
676        for tx in txs {
677            tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
678                .await
679                .unwrap();
680            tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
681                .await
682                .unwrap();
683            tx.send(MessageInner::Barrier(b1.clone().into_dispatcher()).into())
684                .await
685                .unwrap();
686        }
687
688        for _ in 0..6 {
689            let msg = sink_union.next().await.unwrap().unwrap();
690            assert!(msg.is_chunk());
691            assert_eq!(msg.as_chunk().unwrap().ops().len(), 10);
692        }
693
694        // Because the barrier has not been emitted yet, it should not be received.
695        assert!(sink_union.next().now_or_never().is_none());
696
697        for barrier_tx in barrier_txs {
698            barrier_tx.send(b1.clone()).unwrap();
699        }
700
701        let msg = sink_union.next().await.unwrap().unwrap();
702        assert!(msg.is_barrier());
703        let barrier = msg.as_barrier().unwrap();
704        assert_eq!(barrier.epoch, b1.epoch);
705    }
706
707    #[tokio::test]
708    async fn test_dynamic_upstreams() {
709        let test_env = LocalBarrierTestEnv::for_test().await;
710
711        let actor_id = 2;
712        let fragment_id = 0; // from ActorContext::for_test
713        let upstream_fragment_id = 11;
714        let upstream_actor_id = 101;
715
716        let upstream_actor = helper_make_local_actor(upstream_actor_id);
717
718        let add_upstream = PbNewUpstreamSink {
719            info: Some(PbUpstreamSinkInfo {
720                upstream_fragment_id,
721                sink_output_schema: vec![],
722                project_exprs: vec![],
723            }),
724            upstream_actors: vec![upstream_actor],
725        };
726
727        let b1 = Barrier::new_test_barrier(test_epoch(1));
728        let b2 =
729            Barrier::new_test_barrier(test_epoch(2)).with_mutation(Mutation::Add(AddMutation {
730                new_upstream_sinks: HashMap::from([(fragment_id, add_upstream)]),
731                ..Default::default()
732            }));
733        let b3 = Barrier::new_test_barrier(test_epoch(3));
734        let b4 =
735            Barrier::new_test_barrier(test_epoch(4)).with_mutation(Mutation::Stop(StopMutation {
736                dropped_sink_fragments: HashSet::from([upstream_fragment_id]),
737                ..Default::default()
738            }));
739        for barrier in [&b1, &b2, &b3, &b4] {
740            test_env.inject_barrier(barrier, [actor_id]);
741        }
742        test_env.flush_all_events().await;
743
744        let executor = UpstreamSinkUnionExecutor::for_test(
745            actor_id,
746            test_env.local_barrier_manager.clone(),
747            10,
748            Vec::new(), // no initial inputs
749        );
750        // Flush subscribe_barrier events to ensure the executor is ready.
751        test_env.flush_all_events().await;
752
753        // No upstream, but should still forward the barrier.
754        let mut exec_stream = Box::new(executor).execute_inner().boxed();
755        let msg = exec_stream.next().await.unwrap().unwrap();
756        assert_eq!(msg.as_barrier().unwrap().epoch, b1.epoch);
757
758        // Add new upstream.
759        // The barrier should not be emitted because the executor is waiting for new upstream.
760        assert!(exec_stream.next().now_or_never().is_none());
761
762        let mut output_req = test_env
763            .take_pending_new_output_requests(upstream_actor_id)
764            .await;
765        let (_, req) = output_req.pop().unwrap();
766        let tx = match req {
767            NewOutputRequest::Local(tx) => tx,
768            NewOutputRequest::Remote(_) => unreachable!(),
769        };
770
771        tx.send(MessageInner::Barrier(b2.clone().into_dispatcher()).into())
772            .await
773            .unwrap();
774        // Now the executor should emit the barrier.
775        let msg = exec_stream.next().await.unwrap().unwrap();
776        assert_eq!(msg.as_barrier().unwrap().epoch, b2.epoch);
777
778        tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
779            .await
780            .unwrap();
781        let msg = exec_stream.next().await.unwrap().unwrap();
782        assert!(msg.is_chunk());
783
784        tx.send(MessageInner::Barrier(b3.clone().into_dispatcher()).into())
785            .await
786            .unwrap();
787        let msg = exec_stream.next().await.unwrap().unwrap();
788        assert_eq!(msg.as_barrier().unwrap().epoch, b3.epoch);
789
790        // Remove upstream.
791        tx.send(MessageInner::Barrier(b4.clone().into_dispatcher()).into())
792            .await
793            .unwrap();
794        // The executor should emit the barrier with the removal update.
795        let msg = exec_stream.next().await.unwrap().unwrap();
796        assert_eq!(msg.as_barrier().unwrap().epoch, b4.epoch);
797    }
798}