risingwave_stream/executor/
upstream_sink_union.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16use std::iter;
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20use anyhow::Context as _;
21use futures::future::try_join_all;
22use itertools::Itertools;
23use pin_project::pin_project;
24use risingwave_common::catalog::Field;
25use risingwave_expr::expr::{EvalErrorReport, NonStrictExpression, build_non_strict_from_prost};
26use risingwave_pb::common::PbActorInfo;
27use risingwave_pb::expr::PbExprNode;
28use risingwave_pb::plan_common::PbField;
29use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
30use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
31use rw_futures_util::pending_on_none;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33
34use crate::executor::exchange::input::{Input, assert_equal_dispatcher_barrier, new_input};
35use crate::executor::prelude::*;
36use crate::executor::project::apply_project_exprs;
37use crate::executor::{
38    BarrierMutationType, BoxedMessageInput, DynamicReceivers, MergeExecutor, Message,
39};
40use crate::task::{ActorEvalErrorReport, FragmentId, LocalBarrierManager};
41
42type ProcessedMessageStream = impl Stream<Item = MessageStreamItem>;
43
44/// A wrapper that merges data from a single upstream fragment and applies projection expressions.
45/// Each `SinkHandlerInput` represents one upstream fragment with its own merge executor and projection logic.
46#[pin_project]
47pub struct SinkHandlerInput {
48    /// The ID of the upstream fragment that this input is associated with.
49    upstream_fragment_id: FragmentId,
50
51    /// The stream of messages from the upstream fragment.
52    #[pin]
53    processed_stream: ProcessedMessageStream,
54}
55
56impl SinkHandlerInput {
57    pub fn new(
58        upstream_fragment_id: FragmentId,
59        merge: Box<MergeExecutor>,
60        project_exprs: Vec<NonStrictExpression>,
61    ) -> Self {
62        let processed_stream = Self::apply_project_exprs_stream(merge, project_exprs);
63        Self {
64            upstream_fragment_id,
65            processed_stream,
66        }
67    }
68
69    #[define_opaque(ProcessedMessageStream)]
70    fn apply_project_exprs_stream(
71        merge: Box<MergeExecutor>,
72        project_exprs: Vec<NonStrictExpression>,
73    ) -> ProcessedMessageStream {
74        // Apply the projection expressions to the output of the merge executor.
75        Self::apply_project_exprs_stream_impl(merge, project_exprs)
76    }
77
78    /// Applies a projection to the output of a merge executor.
79    #[try_stream(ok = Message, error = StreamExecutorError)]
80    async fn apply_project_exprs_stream_impl(
81        merge: Box<MergeExecutor>,
82        project_exprs: Vec<NonStrictExpression>,
83    ) {
84        let merge_stream = merge.execute_inner();
85        pin_mut!(merge_stream);
86        while let Some(msg) = merge_stream.next().await {
87            let msg = msg?;
88            if let Message::Chunk(chunk) = msg {
89                // Apply the projection expressions to the chunk.
90                let new_chunk = apply_project_exprs(&project_exprs, chunk).await?;
91                yield Message::Chunk(new_chunk);
92            } else {
93                yield msg;
94            }
95        }
96    }
97}
98
99impl Input for SinkHandlerInput {
100    type InputId = FragmentId;
101
102    fn id(&self) -> Self::InputId {
103        // Return a unique identifier for this input, e.g., based on the upstream fragment ID
104        self.upstream_fragment_id
105    }
106}
107
108impl Stream for SinkHandlerInput {
109    type Item = MessageStreamItem;
110
111    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112        self.project().processed_stream.poll_next(cx)
113    }
114}
115
116/// Information about an upstream fragment including its schema and projection expressions.
117#[derive(Debug)]
118pub struct UpstreamFragmentInfo {
119    pub upstream_fragment_id: FragmentId,
120    pub upstream_actors: Vec<PbActorInfo>,
121    pub merge_schema: Schema,
122    pub project_exprs: Vec<NonStrictExpression>,
123}
124
125impl UpstreamFragmentInfo {
126    pub fn new(
127        upstream_fragment_id: FragmentId,
128        initial_upstream_actors: &HashMap<FragmentId, UpstreamActors>,
129        sink_output_schema: &[PbField],
130        project_exprs: &[PbExprNode],
131        error_report: impl EvalErrorReport + 'static,
132    ) -> StreamResult<Self> {
133        let actors = initial_upstream_actors
134            .get(&upstream_fragment_id)
135            .ok_or_else(|| {
136                anyhow::anyhow!(
137                    "upstream fragment {} not found in initial upstream actors",
138                    upstream_fragment_id
139                )
140            })?;
141        let merge_schema = sink_output_schema.iter().map(Field::from).collect();
142        let project_exprs = project_exprs
143            .iter()
144            .map(|e| build_non_strict_from_prost(e, error_report.clone()))
145            .try_collect()
146            .map_err(|err| anyhow::anyhow!(err))?;
147        Ok(Self {
148            upstream_fragment_id,
149            upstream_actors: actors.actors.clone(),
150            merge_schema,
151            project_exprs,
152        })
153    }
154}
155
156type BoxedSinkInput = BoxedMessageInput<FragmentId, BarrierMutationType>;
157
158/// `UpstreamSinkUnionExecutor` merges data from multiple upstream fragments, where each fragment
159/// has its own merge logic and projection expressions. This executor is specifically designed for
160/// sink operations that need to union data from different upstream sources.
161///
162/// Unlike a simple union that just merges streams, this executor:
163/// 1. Creates a separate `MergeExecutor` for each upstream fragment
164/// 2. Applies fragment-specific projection expressions to each stream
165/// 3. Unions all the processed streams into a single output stream
166///
167/// This is useful for sink operators that need to collect data from multiple upstream fragments
168/// with potentially different schemas or processing requirements.
169pub struct UpstreamSinkUnionExecutor {
170    /// The context of the actor.
171    actor_context: ActorContextRef,
172
173    /// Streaming metrics.
174    executor_stats: Arc<StreamingMetrics>,
175
176    /// The initial inputs to the executor.
177    initial_inputs: Vec<BoxedSinkInput>,
178
179    /// For asynchronous processing barriers from `LocalBarrierManager`.
180    upstream_sink_barrier_mgr: UpstreamSinkBarrierManager,
181}
182
183impl Debug for UpstreamSinkUnionExecutor {
184    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
185        f.debug_struct("UpstreamSinkUnionExecutor")
186            .field(
187                "initial_upstream_fragments",
188                &self.initial_inputs.iter().map(|i| i.id()).collect_vec(),
189            )
190            .finish()
191    }
192}
193
194impl Execute for UpstreamSinkUnionExecutor {
195    fn execute(self: Box<Self>) -> BoxedMessageStream {
196        self.execute_inner().boxed()
197    }
198}
199
200struct PendingSinkBarrier {
201    barrier: Barrier,
202    new_input: Option<BoxedSinkInput>,
203    removed_upstream_fragment_ids: HashSet<FragmentId>,
204}
205
206struct BuildSinkInputContext {
207    /// The context of the actor.
208    actor_context: ActorContextRef,
209
210    /// Used to create merge executors.
211    local_barrier_manager: LocalBarrierManager,
212
213    /// Streaming metrics.
214    executor_stats: Arc<StreamingMetrics>,
215
216    /// The size of the chunks to be processed.
217    chunk_size: usize,
218
219    /// The error report for evaluation errors.
220    eval_error_report: ActorEvalErrorReport,
221}
222
223struct UpstreamSinkBarrierManager {
224    /// Context needed to build new sink inputs.
225    build_input_ctx: BuildSinkInputContext,
226
227    /// Used to get barriers directly from the `BarrierManager`.
228    barrier_rx: UnboundedReceiver<Barrier>,
229
230    /// Used to send barriers to the merge executors of upstream fragments.
231    barrier_tx_map: HashMap<FragmentId, UnboundedSender<Barrier>>,
232
233    /// Barriers that have been received from the `barrier_rx` but not yet fully processed.
234    pending_barriers: VecDeque<PendingSinkBarrier>,
235}
236
237impl UpstreamSinkUnionExecutor {
238    // Need to wait for establishing stream-connections to upstream actors, so async.
239    pub async fn new(
240        ctx: ActorContextRef,
241        local_barrier_manager: LocalBarrierManager,
242        executor_stats: Arc<StreamingMetrics>,
243        chunk_size: usize,
244        initial_upstream_infos: Vec<UpstreamFragmentInfo>,
245        eval_error_report: ActorEvalErrorReport,
246    ) -> StreamExecutorResult<Self> {
247        let barrier_rx = local_barrier_manager.subscribe_barrier(ctx.id);
248        let build_input_ctx = BuildSinkInputContext {
249            actor_context: ctx.clone(),
250            local_barrier_manager,
251            executor_stats: executor_stats.clone(),
252            chunk_size,
253            eval_error_report,
254        };
255        let mut upstream_sink_barrier_mgr =
256            UpstreamSinkBarrierManager::new(build_input_ctx, barrier_rx);
257
258        let mut initial_inputs = Vec::with_capacity(initial_upstream_infos.len());
259        for info in initial_upstream_infos {
260            let input = upstream_sink_barrier_mgr.new_sink_input_impl(info).await?;
261            initial_inputs.push(input);
262        }
263
264        let executor = Self {
265            actor_context: ctx,
266            executor_stats,
267            initial_inputs,
268            upstream_sink_barrier_mgr,
269        };
270
271        Ok(executor)
272    }
273
274    #[cfg(test)]
275    pub fn for_test(
276        actor_id: ActorId,
277        local_barrier_manager: LocalBarrierManager,
278        chunk_size: usize,
279        initial_inputs: Vec<BoxedSinkInput>,
280    ) -> Self {
281        let actor_ctx = ActorContext::for_test(actor_id);
282        let executor_stats: Arc<StreamingMetrics> = StreamingMetrics::unused().into();
283        let eval_error_report = ActorEvalErrorReport {
284            actor_context: actor_ctx.clone(),
285            identity: format!("UpstreamSinkUnionExecutor-{}", actor_id).into(),
286        };
287
288        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
289        let build_input_ctx = BuildSinkInputContext {
290            actor_context: actor_ctx.clone(),
291            local_barrier_manager,
292            executor_stats: executor_stats.clone(),
293            chunk_size,
294            eval_error_report,
295        };
296        let upstream_sink_barrier_mgr =
297            UpstreamSinkBarrierManager::new(build_input_ctx, barrier_rx);
298
299        Self {
300            actor_context: actor_ctx,
301            executor_stats,
302            initial_inputs,
303            upstream_sink_barrier_mgr,
304        }
305    }
306
307    #[try_stream(ok = Message, error = StreamExecutorError)]
308    async fn execute_inner(self: Box<Self>) {
309        let actor_id = self.actor_context.id;
310        let fragment_id = self.actor_context.fragment_id;
311
312        let barrier_align = self
313            .executor_stats
314            .barrier_align_duration
315            .with_guarded_label_values(&[
316                actor_id.to_string().as_str(),
317                fragment_id.to_string().as_str(),
318                "",
319                "UpstreamSinkUnion",
320            ]);
321
322        let upstreams =
323            DynamicReceivers::new(self.initial_inputs, Some(barrier_align.clone()), None);
324        pin_mut!(upstreams);
325
326        let mut upstream_sink_barrier_mgr = self.upstream_sink_barrier_mgr;
327
328        loop {
329            let msg = upstream_sink_barrier_mgr
330                .await_next_message(&mut upstreams)
331                .await?;
332
333            if let Message::Barrier(barrier) = &msg {
334                let pending_barrier = upstream_sink_barrier_mgr.pop_pending_barrier();
335                assert_equal_dispatcher_barrier(barrier, &pending_barrier.barrier);
336
337                if let Some(mut new_sink_input) = pending_barrier.new_input {
338                    let first_barrier = expect_first_barrier(&mut new_sink_input).await?;
339                    assert_equal_dispatcher_barrier(&pending_barrier.barrier, &first_barrier);
340                    upstreams.add_upstreams_from(iter::once(new_sink_input));
341                }
342
343                if !pending_barrier.removed_upstream_fragment_ids.is_empty() {
344                    upstreams.remove_upstreams(&pending_barrier.removed_upstream_fragment_ids);
345                }
346            }
347
348            yield msg;
349        }
350    }
351}
352
353impl UpstreamSinkBarrierManager {
354    fn new(build_input_ctx: BuildSinkInputContext, barrier_rx: UnboundedReceiver<Barrier>) -> Self {
355        Self {
356            build_input_ctx,
357            barrier_rx,
358            barrier_tx_map: HashMap::new(),
359            pending_barriers: VecDeque::new(),
360        }
361    }
362
363    async fn await_next_message(
364        &mut self,
365        upstreams: &mut DynamicReceivers<FragmentId, BarrierMutationType>,
366    ) -> StreamExecutorResult<Message> {
367        loop {
368            if upstreams.is_empty() && !self.pending_barriers.is_empty() {
369                let barrier = self.pending_barriers.front().unwrap().barrier.clone();
370                return Ok(Message::Barrier(barrier));
371            }
372            tokio::select! {
373                biased;
374
375                // If None is returned, it means upstreams is empty, which means we should continue pending and wait on
376                // the second branch.
377                msg = pending_on_none(upstreams.next()) => {
378                    return msg;
379                }
380
381                barrier = self.barrier_rx.recv() => {
382                    // Here, if there's no upstream, the cached barrier here will be sent out immediately in the next
383                    // loop. Otherwise, we need to forward the barrier to the current upstreams, cache this barrier, and
384                    // then wait in the first branch until the upstreams have processed the barrier.
385                    let barrier = barrier.context("Failed to receive barrier from barrier_rx")?;
386                    let pending_barrier = self.partially_apply_barrier(barrier).await?;
387                    self.pending_barriers.push_back(pending_barrier);
388                    continue;
389                }
390            }
391        }
392    }
393
394    async fn partially_apply_barrier(
395        &mut self,
396        barrier: Barrier,
397    ) -> StreamExecutorResult<PendingSinkBarrier> {
398        let new_sink_input = if let Some(new_upstream_sink) =
399            barrier.as_new_upstream_sink(self.build_input_ctx.actor_context.fragment_id)
400        {
401            // Create new inputs for the newly added upstream sinks.
402            let new_input = self.new_sink_input(new_upstream_sink).await?;
403            Some(new_input)
404        } else {
405            None
406        };
407
408        self.forward_barrier_to_all_upstreams(&barrier)?;
409
410        let removed_upstream_fragment_ids = if let Some(dropped_upstream_sinks) =
411            barrier.as_dropped_upstream_sinks()
412            && !dropped_upstream_sinks.is_empty()
413        {
414            // Remove the upstream sinks that are no longer needed.
415            self.barrier_tx_map
416                .extract_if(|upstream_fragment_id, _| {
417                    dropped_upstream_sinks.contains(upstream_fragment_id)
418                })
419                .map(|(upstream_fragment_id, _)| upstream_fragment_id)
420                .collect()
421        } else {
422            HashSet::new()
423        };
424
425        Ok(PendingSinkBarrier {
426            barrier,
427            new_input: new_sink_input,
428            removed_upstream_fragment_ids,
429        })
430    }
431
432    fn pop_pending_barrier(&mut self) -> PendingSinkBarrier {
433        self.pending_barriers
434            .pop_front()
435            .expect("should always have a pending barrier when receiving a barrier from upstreams")
436    }
437
438    fn forward_barrier_to_all_upstreams(&self, barrier: &Barrier) -> StreamExecutorResult<()> {
439        for tx in self.barrier_tx_map.values() {
440            tx.send(barrier.clone())
441                .map_err(|e| StreamExecutorError::from(anyhow::anyhow!(e)))?;
442        }
443        Ok(())
444    }
445
446    async fn new_sink_input(
447        &mut self,
448        pb_upstream_info: &PbNewUpstreamSink,
449    ) -> StreamExecutorResult<BoxedSinkInput> {
450        let info = pb_upstream_info.get_info().unwrap();
451        let merge_schema = info
452            .get_sink_output_schema()
453            .iter()
454            .map(Field::from)
455            .collect();
456        let project_exprs = info
457            .get_project_exprs()
458            .iter()
459            .map(|e| build_non_strict_from_prost(e, self.build_input_ctx.eval_error_report.clone()))
460            .try_collect()
461            .map_err(|err| anyhow::anyhow!(err))?;
462        let upstream_fragment_id = info.get_upstream_fragment_id();
463        self.new_sink_input_impl(UpstreamFragmentInfo {
464            upstream_fragment_id,
465            upstream_actors: pb_upstream_info.get_upstream_actors().clone(),
466            merge_schema,
467            project_exprs,
468        })
469        .await
470    }
471
472    async fn new_sink_input_impl(
473        &mut self,
474        UpstreamFragmentInfo {
475            upstream_fragment_id,
476            upstream_actors,
477            merge_schema,
478            project_exprs,
479        }: UpstreamFragmentInfo,
480    ) -> StreamExecutorResult<BoxedSinkInput> {
481        let merge_executor = self
482            .new_merge_executor(upstream_fragment_id, upstream_actors, merge_schema)
483            .await?;
484
485        Ok(SinkHandlerInput::new(
486            upstream_fragment_id,
487            Box::new(merge_executor),
488            project_exprs,
489        )
490        .boxed_input())
491    }
492
493    async fn new_merge_executor(
494        &mut self,
495        upstream_fragment_id: FragmentId,
496        upstream_actors: Vec<PbActorInfo>,
497        schema: Schema,
498    ) -> StreamExecutorResult<MergeExecutor> {
499        let ctx = &self.build_input_ctx;
500        let inputs = try_join_all(upstream_actors.iter().map(|actor| {
501            new_input(
502                &ctx.local_barrier_manager,
503                ctx.executor_stats.clone(),
504                ctx.actor_context.id,
505                ctx.actor_context.fragment_id,
506                actor,
507                upstream_fragment_id,
508                ctx.actor_context.config.clone(),
509            )
510        }))
511        .await?;
512
513        let (barrier_tx, barrier_rx) = unbounded_channel();
514        self.barrier_tx_map
515            .try_insert(upstream_fragment_id, barrier_tx)
516            .expect("non-duplicate");
517
518        let upstreams = MergeExecutor::new_merge_upstream(
519            inputs,
520            &ctx.executor_stats,
521            &ctx.actor_context,
522            ctx.chunk_size,
523            schema,
524        );
525
526        Ok(MergeExecutor::new(
527            ctx.actor_context.clone(),
528            ctx.actor_context.fragment_id,
529            upstream_fragment_id,
530            upstreams,
531            ctx.local_barrier_manager.clone(),
532            ctx.executor_stats.clone(),
533            barrier_rx,
534        ))
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use std::collections::HashSet;
541
542    use futures::FutureExt;
543    use risingwave_common::array::{Op, StreamChunkTestExt};
544    use risingwave_common::catalog::Field;
545    use risingwave_common::util::epoch::test_epoch;
546    use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
547    use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
548
549    use super::*;
550    use crate::executor::exchange::permit::{Sender, channel_for_test};
551    use crate::executor::test_utils::expr::build_from_pretty;
552    use crate::executor::{AddMutation, MessageInner, StopMutation};
553    use crate::task::NewOutputRequest;
554    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
555    use crate::task::test_utils::helper_make_local_actor;
556
557    #[tokio::test]
558    async fn test_sink_input() {
559        let test_env = LocalBarrierTestEnv::for_test().await;
560
561        let actor_id = 2;
562
563        let b1 = Barrier::with_prev_epoch_for_test(2, 1);
564
565        test_env.inject_barrier(&b1, [actor_id.into()]);
566        test_env.flush_all_events().await;
567
568        let schema = Schema {
569            fields: vec![
570                Field::unnamed(DataType::Int64),
571                Field::unnamed(DataType::Int64),
572            ],
573        };
574
575        let (tx1, rx1) = channel_for_test();
576        let (tx2, rx2) = channel_for_test();
577
578        let merge = MergeExecutor::for_test(
579            actor_id,
580            vec![rx1, rx2],
581            test_env.local_barrier_manager.clone(),
582            schema.clone(),
583            5,
584            None,
585        );
586
587        let test_expr = build_from_pretty("$1:int8");
588
589        let mut input = SinkHandlerInput::new(
590            1919.into(), // from MergeExecutor::for_test()
591            Box::new(merge),
592            vec![test_expr],
593        )
594        .boxed_input();
595
596        let chunk1 = StreamChunk::from_pretty(
597            " I I
598            + 1 4
599            + 2 5
600            + 3 6",
601        );
602        let chunk2 = StreamChunk::from_pretty(
603            " I I
604            + 7 8
605            - 3 6",
606        );
607
608        tx1.send(MessageInner::Chunk(chunk1).into()).await.unwrap();
609        tx2.send(MessageInner::Chunk(chunk2).into()).await.unwrap();
610
611        let msg = input.next().await.unwrap().unwrap();
612        assert_eq!(
613            *msg.as_chunk().unwrap(),
614            StreamChunk::from_pretty(
615                " I
616                + 4
617                + 5
618                + 6
619                + 8
620                - 6"
621            )
622        );
623    }
624
625    fn new_input_for_test(
626        actor_id: ActorId,
627        local_barrier_manager: LocalBarrierManager,
628    ) -> (BoxedSinkInput, Sender, UnboundedSender<Barrier>) {
629        let (tx, rx) = channel_for_test();
630        let (barrier_tx, barrier_rx) = unbounded_channel();
631        let merge = MergeExecutor::for_test(
632            actor_id,
633            vec![rx],
634            local_barrier_manager,
635            Schema::new(vec![]),
636            10,
637            Some(barrier_rx),
638        );
639        let input = SinkHandlerInput::new(
640            FragmentId::new(actor_id.as_raw_id()),
641            Box::new(merge),
642            vec![],
643        )
644        .boxed_input();
645        (input, tx, barrier_tx)
646    }
647
648    fn build_test_chunk(size: u64) -> StreamChunk {
649        let ops = vec![Op::Insert; size as usize];
650        StreamChunk::new(ops, vec![])
651    }
652
653    #[tokio::test]
654    async fn test_fixed_upstreams() {
655        let test_env = LocalBarrierTestEnv::for_test().await;
656
657        let actor_id = 2.into();
658
659        let b1 = Barrier::with_prev_epoch_for_test(2, 1);
660
661        test_env.inject_barrier(&b1, [actor_id]);
662        test_env.flush_all_events().await;
663
664        let mut inputs = Vec::with_capacity(3);
665        let mut txs = Vec::with_capacity(3);
666        let mut barrier_txs = Vec::with_capacity(3);
667        for _ in 0..3 {
668            let (input, tx, barrier_tx) =
669                new_input_for_test(actor_id, test_env.local_barrier_manager.clone());
670            inputs.push(input);
671            txs.push(tx);
672            barrier_txs.push(barrier_tx);
673        }
674
675        let sink_union = UpstreamSinkUnionExecutor::for_test(
676            actor_id,
677            test_env.local_barrier_manager.clone(),
678            10,
679            inputs,
680        );
681        // Flush subscribe_barrier events to ensure the executor is ready.
682        test_env.flush_all_events().await;
683        let mut sink_union = Box::new(sink_union).execute_inner().boxed();
684
685        for tx in txs {
686            tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
687                .await
688                .unwrap();
689            tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
690                .await
691                .unwrap();
692            tx.send(MessageInner::Barrier(b1.clone().into_dispatcher()).into())
693                .await
694                .unwrap();
695        }
696
697        for _ in 0..6 {
698            let msg = sink_union.next().await.unwrap().unwrap();
699            assert!(msg.is_chunk());
700            assert_eq!(msg.as_chunk().unwrap().ops().len(), 10);
701        }
702
703        // Because the barrier has not been emitted yet, it should not be received.
704        assert!(sink_union.next().now_or_never().is_none());
705
706        for barrier_tx in barrier_txs {
707            barrier_tx.send(b1.clone()).unwrap();
708        }
709
710        let msg = sink_union.next().await.unwrap().unwrap();
711        assert!(msg.is_barrier());
712        let barrier = msg.as_barrier().unwrap();
713        assert_eq!(barrier.epoch, b1.epoch);
714    }
715
716    #[tokio::test]
717    async fn test_dynamic_upstreams() {
718        let test_env = LocalBarrierTestEnv::for_test().await;
719
720        let actor_id = 2.into();
721        let fragment_id = 0.into(); // from ActorContext::for_test
722        let upstream_fragment_id = 11.into();
723        let upstream_actor_id = 101.into();
724
725        let upstream_actor = helper_make_local_actor(upstream_actor_id);
726
727        let add_upstream = PbNewUpstreamSink {
728            info: Some(PbUpstreamSinkInfo {
729                upstream_fragment_id,
730                sink_output_schema: vec![],
731                project_exprs: vec![],
732            }),
733            upstream_actors: vec![upstream_actor],
734        };
735
736        let b1 = Barrier::new_test_barrier(test_epoch(1));
737        let b2 =
738            Barrier::new_test_barrier(test_epoch(2)).with_mutation(Mutation::Add(AddMutation {
739                new_upstream_sinks: HashMap::from([(fragment_id, add_upstream)]),
740                ..Default::default()
741            }));
742        let b3 = Barrier::new_test_barrier(test_epoch(3));
743        let b4 =
744            Barrier::new_test_barrier(test_epoch(4)).with_mutation(Mutation::Stop(StopMutation {
745                dropped_sink_fragments: HashSet::from([upstream_fragment_id]),
746                ..Default::default()
747            }));
748        for barrier in [&b1, &b2, &b3, &b4] {
749            test_env.inject_barrier(barrier, [actor_id]);
750        }
751        test_env.flush_all_events().await;
752
753        let executor = UpstreamSinkUnionExecutor::for_test(
754            actor_id,
755            test_env.local_barrier_manager.clone(),
756            10,
757            Vec::new(), // no initial inputs
758        );
759        // Flush subscribe_barrier events to ensure the executor is ready.
760        test_env.flush_all_events().await;
761
762        // No upstream, but should still forward the barrier.
763        let mut exec_stream = Box::new(executor).execute_inner().boxed();
764        let msg = exec_stream.next().await.unwrap().unwrap();
765        assert_eq!(msg.as_barrier().unwrap().epoch, b1.epoch);
766
767        // Add new upstream.
768        // The barrier should not be emitted because the executor is waiting for new upstream.
769        assert!(exec_stream.next().now_or_never().is_none());
770
771        let mut output_req = test_env
772            .take_pending_new_output_requests(upstream_actor_id)
773            .await;
774        let (_, req) = output_req.pop().unwrap();
775        let tx = match req {
776            NewOutputRequest::Local(tx) => tx,
777            NewOutputRequest::Remote(_) => unreachable!(),
778        };
779
780        tx.send(MessageInner::Barrier(b2.clone().into_dispatcher()).into())
781            .await
782            .unwrap();
783        // Now the executor should emit the barrier.
784        let msg = exec_stream.next().await.unwrap().unwrap();
785        assert_eq!(msg.as_barrier().unwrap().epoch, b2.epoch);
786
787        tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
788            .await
789            .unwrap();
790        let msg = exec_stream.next().await.unwrap().unwrap();
791        assert!(msg.is_chunk());
792
793        tx.send(MessageInner::Barrier(b3.clone().into_dispatcher()).into())
794            .await
795            .unwrap();
796        let msg = exec_stream.next().await.unwrap().unwrap();
797        assert_eq!(msg.as_barrier().unwrap().epoch, b3.epoch);
798
799        // Remove upstream.
800        tx.send(MessageInner::Barrier(b4.clone().into_dispatcher()).into())
801            .await
802            .unwrap();
803        // The executor should emit the barrier with the removal update.
804        let msg = exec_stream.next().await.unwrap().unwrap();
805        assert_eq!(msg.as_barrier().unwrap().epoch, b4.epoch);
806    }
807}