1use std::collections::{HashMap, HashSet, VecDeque};
16use std::iter;
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20use anyhow::Context as _;
21use futures::future::try_join_all;
22use itertools::Itertools;
23use pin_project::pin_project;
24use risingwave_common::catalog::Field;
25use risingwave_expr::expr::{EvalErrorReport, NonStrictExpression, build_non_strict_from_prost};
26use risingwave_pb::common::PbActorInfo;
27use risingwave_pb::expr::PbExprNode;
28use risingwave_pb::plan_common::PbField;
29use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
30use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
31use rw_futures_util::pending_on_none;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33
34use crate::executor::exchange::input::{Input, assert_equal_dispatcher_barrier, new_input};
35use crate::executor::prelude::*;
36use crate::executor::project::apply_project_exprs;
37use crate::executor::{
38 BarrierMutationType, BoxedMessageInput, DynamicReceivers, MergeExecutor, Message,
39};
40use crate::task::{ActorEvalErrorReport, FragmentId, LocalBarrierManager};
41
42type ProcessedMessageStream = impl Stream<Item = MessageStreamItem>;
43
44#[pin_project]
47pub struct SinkHandlerInput {
48 upstream_fragment_id: FragmentId,
50
51 #[pin]
53 processed_stream: ProcessedMessageStream,
54}
55
56impl SinkHandlerInput {
57 pub fn new(
58 upstream_fragment_id: FragmentId,
59 merge: Box<MergeExecutor>,
60 project_exprs: Vec<NonStrictExpression>,
61 ) -> Self {
62 let processed_stream = Self::apply_project_exprs_stream(merge, project_exprs);
63 Self {
64 upstream_fragment_id,
65 processed_stream,
66 }
67 }
68
69 #[define_opaque(ProcessedMessageStream)]
70 fn apply_project_exprs_stream(
71 merge: Box<MergeExecutor>,
72 project_exprs: Vec<NonStrictExpression>,
73 ) -> ProcessedMessageStream {
74 Self::apply_project_exprs_stream_impl(merge, project_exprs)
76 }
77
78 #[try_stream(ok = Message, error = StreamExecutorError)]
80 async fn apply_project_exprs_stream_impl(
81 merge: Box<MergeExecutor>,
82 project_exprs: Vec<NonStrictExpression>,
83 ) {
84 let merge_stream = merge.execute_inner();
85 pin_mut!(merge_stream);
86 while let Some(msg) = merge_stream.next().await {
87 let msg = msg?;
88 if let Message::Chunk(chunk) = msg {
89 let new_chunk = apply_project_exprs(&project_exprs, chunk).await?;
91 yield Message::Chunk(new_chunk);
92 } else {
93 yield msg;
94 }
95 }
96 }
97}
98
99impl Input for SinkHandlerInput {
100 type InputId = FragmentId;
101
102 fn id(&self) -> Self::InputId {
103 self.upstream_fragment_id
105 }
106}
107
108impl Stream for SinkHandlerInput {
109 type Item = MessageStreamItem;
110
111 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112 self.project().processed_stream.poll_next(cx)
113 }
114}
115
116#[derive(Debug)]
118pub struct UpstreamFragmentInfo {
119 pub upstream_fragment_id: FragmentId,
120 pub upstream_actors: Vec<PbActorInfo>,
121 pub merge_schema: Schema,
122 pub project_exprs: Vec<NonStrictExpression>,
123}
124
125impl UpstreamFragmentInfo {
126 pub fn new(
127 upstream_fragment_id: FragmentId,
128 initial_upstream_actors: &HashMap<FragmentId, UpstreamActors>,
129 sink_output_schema: &[PbField],
130 project_exprs: &[PbExprNode],
131 error_report: impl EvalErrorReport + 'static,
132 ) -> StreamResult<Self> {
133 let actors = initial_upstream_actors
134 .get(&upstream_fragment_id)
135 .ok_or_else(|| {
136 anyhow::anyhow!(
137 "upstream fragment {} not found in initial upstream actors",
138 upstream_fragment_id
139 )
140 })?;
141 let merge_schema = sink_output_schema.iter().map(Field::from).collect();
142 let project_exprs = project_exprs
143 .iter()
144 .map(|e| build_non_strict_from_prost(e, error_report.clone()))
145 .try_collect()
146 .map_err(|err| anyhow::anyhow!(err))?;
147 Ok(Self {
148 upstream_fragment_id,
149 upstream_actors: actors.actors.clone(),
150 merge_schema,
151 project_exprs,
152 })
153 }
154}
155
156type BoxedSinkInput = BoxedMessageInput<FragmentId, BarrierMutationType>;
157
158pub struct UpstreamSinkUnionExecutor {
170 actor_context: ActorContextRef,
172
173 executor_stats: Arc<StreamingMetrics>,
175
176 initial_inputs: Vec<BoxedSinkInput>,
178
179 upstream_sink_barrier_mgr: UpstreamSinkBarrierManager,
181}
182
183impl Debug for UpstreamSinkUnionExecutor {
184 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
185 f.debug_struct("UpstreamSinkUnionExecutor")
186 .field(
187 "initial_upstream_fragments",
188 &self.initial_inputs.iter().map(|i| i.id()).collect_vec(),
189 )
190 .finish()
191 }
192}
193
194impl Execute for UpstreamSinkUnionExecutor {
195 fn execute(self: Box<Self>) -> BoxedMessageStream {
196 self.execute_inner().boxed()
197 }
198}
199
200struct PendingSinkBarrier {
201 barrier: Barrier,
202 new_input: Option<BoxedSinkInput>,
203 removed_upstream_fragment_ids: HashSet<FragmentId>,
204}
205
206struct BuildSinkInputContext {
207 actor_context: ActorContextRef,
209
210 local_barrier_manager: LocalBarrierManager,
212
213 executor_stats: Arc<StreamingMetrics>,
215
216 chunk_size: usize,
218
219 eval_error_report: ActorEvalErrorReport,
221}
222
223struct UpstreamSinkBarrierManager {
224 build_input_ctx: BuildSinkInputContext,
226
227 barrier_rx: UnboundedReceiver<Barrier>,
229
230 barrier_tx_map: HashMap<FragmentId, UnboundedSender<Barrier>>,
232
233 pending_barriers: VecDeque<PendingSinkBarrier>,
235}
236
237impl UpstreamSinkUnionExecutor {
238 pub async fn new(
240 ctx: ActorContextRef,
241 local_barrier_manager: LocalBarrierManager,
242 executor_stats: Arc<StreamingMetrics>,
243 chunk_size: usize,
244 initial_upstream_infos: Vec<UpstreamFragmentInfo>,
245 eval_error_report: ActorEvalErrorReport,
246 ) -> StreamExecutorResult<Self> {
247 let barrier_rx = local_barrier_manager.subscribe_barrier(ctx.id);
248 let build_input_ctx = BuildSinkInputContext {
249 actor_context: ctx.clone(),
250 local_barrier_manager,
251 executor_stats: executor_stats.clone(),
252 chunk_size,
253 eval_error_report,
254 };
255 let mut upstream_sink_barrier_mgr =
256 UpstreamSinkBarrierManager::new(build_input_ctx, barrier_rx);
257
258 let mut initial_inputs = Vec::with_capacity(initial_upstream_infos.len());
259 for info in initial_upstream_infos {
260 let input = upstream_sink_barrier_mgr.new_sink_input_impl(info).await?;
261 initial_inputs.push(input);
262 }
263
264 let executor = Self {
265 actor_context: ctx,
266 executor_stats,
267 initial_inputs,
268 upstream_sink_barrier_mgr,
269 };
270
271 Ok(executor)
272 }
273
274 #[cfg(test)]
275 pub fn for_test(
276 actor_id: ActorId,
277 local_barrier_manager: LocalBarrierManager,
278 chunk_size: usize,
279 initial_inputs: Vec<BoxedSinkInput>,
280 ) -> Self {
281 let actor_ctx = ActorContext::for_test(actor_id);
282 let executor_stats: Arc<StreamingMetrics> = StreamingMetrics::unused().into();
283 let eval_error_report = ActorEvalErrorReport {
284 actor_context: actor_ctx.clone(),
285 identity: format!("UpstreamSinkUnionExecutor-{}", actor_id).into(),
286 };
287
288 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
289 let build_input_ctx = BuildSinkInputContext {
290 actor_context: actor_ctx.clone(),
291 local_barrier_manager,
292 executor_stats: executor_stats.clone(),
293 chunk_size,
294 eval_error_report,
295 };
296 let upstream_sink_barrier_mgr =
297 UpstreamSinkBarrierManager::new(build_input_ctx, barrier_rx);
298
299 Self {
300 actor_context: actor_ctx,
301 executor_stats,
302 initial_inputs,
303 upstream_sink_barrier_mgr,
304 }
305 }
306
307 #[try_stream(ok = Message, error = StreamExecutorError)]
308 async fn execute_inner(self: Box<Self>) {
309 let actor_id = self.actor_context.id;
310 let fragment_id = self.actor_context.fragment_id;
311
312 let barrier_align = self
313 .executor_stats
314 .barrier_align_duration
315 .with_guarded_label_values(&[
316 actor_id.to_string().as_str(),
317 fragment_id.to_string().as_str(),
318 "",
319 "UpstreamSinkUnion",
320 ]);
321
322 let upstreams =
323 DynamicReceivers::new(self.initial_inputs, Some(barrier_align.clone()), None);
324 pin_mut!(upstreams);
325
326 let mut upstream_sink_barrier_mgr = self.upstream_sink_barrier_mgr;
327
328 loop {
329 let msg = upstream_sink_barrier_mgr
330 .await_next_message(&mut upstreams)
331 .await?;
332
333 if let Message::Barrier(barrier) = &msg {
334 let pending_barrier = upstream_sink_barrier_mgr.pop_pending_barrier();
335 assert_equal_dispatcher_barrier(barrier, &pending_barrier.barrier);
336
337 if let Some(mut new_sink_input) = pending_barrier.new_input {
338 let first_barrier = expect_first_barrier(&mut new_sink_input).await?;
339 assert_equal_dispatcher_barrier(&pending_barrier.barrier, &first_barrier);
340 upstreams.add_upstreams_from(iter::once(new_sink_input));
341 }
342
343 if !pending_barrier.removed_upstream_fragment_ids.is_empty() {
344 upstreams.remove_upstreams(&pending_barrier.removed_upstream_fragment_ids);
345 }
346 }
347
348 yield msg;
349 }
350 }
351}
352
353impl UpstreamSinkBarrierManager {
354 fn new(build_input_ctx: BuildSinkInputContext, barrier_rx: UnboundedReceiver<Barrier>) -> Self {
355 Self {
356 build_input_ctx,
357 barrier_rx,
358 barrier_tx_map: HashMap::new(),
359 pending_barriers: VecDeque::new(),
360 }
361 }
362
363 async fn await_next_message(
364 &mut self,
365 upstreams: &mut DynamicReceivers<FragmentId, BarrierMutationType>,
366 ) -> StreamExecutorResult<Message> {
367 loop {
368 if upstreams.is_empty() && !self.pending_barriers.is_empty() {
369 let barrier = self.pending_barriers.front().unwrap().barrier.clone();
370 return Ok(Message::Barrier(barrier));
371 }
372 tokio::select! {
373 biased;
374
375 msg = pending_on_none(upstreams.next()) => {
378 return msg;
379 }
380
381 barrier = self.barrier_rx.recv() => {
382 let barrier = barrier.context("Failed to receive barrier from barrier_rx")?;
386 let pending_barrier = self.partially_apply_barrier(barrier).await?;
387 self.pending_barriers.push_back(pending_barrier);
388 continue;
389 }
390 }
391 }
392 }
393
394 async fn partially_apply_barrier(
395 &mut self,
396 barrier: Barrier,
397 ) -> StreamExecutorResult<PendingSinkBarrier> {
398 let new_sink_input = if let Some(new_upstream_sink) =
399 barrier.as_new_upstream_sink(self.build_input_ctx.actor_context.fragment_id)
400 {
401 let new_input = self.new_sink_input(new_upstream_sink).await?;
403 Some(new_input)
404 } else {
405 None
406 };
407
408 self.forward_barrier_to_all_upstreams(&barrier)?;
409
410 let removed_upstream_fragment_ids = if let Some(dropped_upstream_sinks) =
411 barrier.as_dropped_upstream_sinks()
412 && !dropped_upstream_sinks.is_empty()
413 {
414 self.barrier_tx_map
416 .extract_if(|upstream_fragment_id, _| {
417 dropped_upstream_sinks.contains(upstream_fragment_id)
418 })
419 .map(|(upstream_fragment_id, _)| upstream_fragment_id)
420 .collect()
421 } else {
422 HashSet::new()
423 };
424
425 Ok(PendingSinkBarrier {
426 barrier,
427 new_input: new_sink_input,
428 removed_upstream_fragment_ids,
429 })
430 }
431
432 fn pop_pending_barrier(&mut self) -> PendingSinkBarrier {
433 self.pending_barriers
434 .pop_front()
435 .expect("should always have a pending barrier when receiving a barrier from upstreams")
436 }
437
438 fn forward_barrier_to_all_upstreams(&self, barrier: &Barrier) -> StreamExecutorResult<()> {
439 for tx in self.barrier_tx_map.values() {
440 tx.send(barrier.clone())
441 .map_err(|e| StreamExecutorError::from(anyhow::anyhow!(e)))?;
442 }
443 Ok(())
444 }
445
446 async fn new_sink_input(
447 &mut self,
448 pb_upstream_info: &PbNewUpstreamSink,
449 ) -> StreamExecutorResult<BoxedSinkInput> {
450 let info = pb_upstream_info.get_info().unwrap();
451 let merge_schema = info
452 .get_sink_output_schema()
453 .iter()
454 .map(Field::from)
455 .collect();
456 let project_exprs = info
457 .get_project_exprs()
458 .iter()
459 .map(|e| build_non_strict_from_prost(e, self.build_input_ctx.eval_error_report.clone()))
460 .try_collect()
461 .map_err(|err| anyhow::anyhow!(err))?;
462 let upstream_fragment_id = info.get_upstream_fragment_id();
463 self.new_sink_input_impl(UpstreamFragmentInfo {
464 upstream_fragment_id,
465 upstream_actors: pb_upstream_info.get_upstream_actors().clone(),
466 merge_schema,
467 project_exprs,
468 })
469 .await
470 }
471
472 async fn new_sink_input_impl(
473 &mut self,
474 UpstreamFragmentInfo {
475 upstream_fragment_id,
476 upstream_actors,
477 merge_schema,
478 project_exprs,
479 }: UpstreamFragmentInfo,
480 ) -> StreamExecutorResult<BoxedSinkInput> {
481 let merge_executor = self
482 .new_merge_executor(upstream_fragment_id, upstream_actors, merge_schema)
483 .await?;
484
485 Ok(SinkHandlerInput::new(
486 upstream_fragment_id,
487 Box::new(merge_executor),
488 project_exprs,
489 )
490 .boxed_input())
491 }
492
493 async fn new_merge_executor(
494 &mut self,
495 upstream_fragment_id: FragmentId,
496 upstream_actors: Vec<PbActorInfo>,
497 schema: Schema,
498 ) -> StreamExecutorResult<MergeExecutor> {
499 let ctx = &self.build_input_ctx;
500 let inputs = try_join_all(upstream_actors.iter().map(|actor| {
501 new_input(
502 &ctx.local_barrier_manager,
503 ctx.executor_stats.clone(),
504 ctx.actor_context.id,
505 ctx.actor_context.fragment_id,
506 actor,
507 upstream_fragment_id,
508 )
509 }))
510 .await?;
511
512 let (barrier_tx, barrier_rx) = unbounded_channel();
513 self.barrier_tx_map
514 .try_insert(upstream_fragment_id, barrier_tx)
515 .expect("non-duplicate");
516
517 let upstreams =
518 MergeExecutor::new_select_receiver(inputs, &ctx.executor_stats, &ctx.actor_context);
519
520 Ok(MergeExecutor::new(
521 ctx.actor_context.clone(),
522 ctx.actor_context.fragment_id,
523 upstream_fragment_id,
524 upstreams,
525 ctx.local_barrier_manager.clone(),
526 ctx.executor_stats.clone(),
527 barrier_rx,
528 ctx.chunk_size,
529 schema,
530 ))
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use std::collections::HashSet;
537
538 use futures::FutureExt;
539 use risingwave_common::array::{Op, StreamChunkTestExt};
540 use risingwave_common::catalog::Field;
541 use risingwave_common::util::epoch::test_epoch;
542 use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
543 use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
544
545 use super::*;
546 use crate::executor::exchange::permit::{Sender, channel_for_test};
547 use crate::executor::test_utils::expr::build_from_pretty;
548 use crate::executor::{AddMutation, MessageInner, StopMutation};
549 use crate::task::NewOutputRequest;
550 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
551 use crate::task::test_utils::helper_make_local_actor;
552
553 #[tokio::test]
554 async fn test_sink_input() {
555 let test_env = LocalBarrierTestEnv::for_test().await;
556
557 let actor_id = 2;
558
559 let b1 = Barrier::with_prev_epoch_for_test(2, 1);
560
561 test_env.inject_barrier(&b1, [actor_id.into()]);
562 test_env.flush_all_events().await;
563
564 let schema = Schema {
565 fields: vec![
566 Field::unnamed(DataType::Int64),
567 Field::unnamed(DataType::Int64),
568 ],
569 };
570
571 let (tx1, rx1) = channel_for_test();
572 let (tx2, rx2) = channel_for_test();
573
574 let merge = MergeExecutor::for_test(
575 actor_id,
576 vec![rx1, rx2],
577 test_env.local_barrier_manager.clone(),
578 schema.clone(),
579 5,
580 None,
581 );
582
583 let test_expr = build_from_pretty("$1:int8");
584
585 let mut input = SinkHandlerInput::new(
586 1919.into(), Box::new(merge),
588 vec![test_expr],
589 )
590 .boxed_input();
591
592 let chunk1 = StreamChunk::from_pretty(
593 " I I
594 + 1 4
595 + 2 5
596 + 3 6",
597 );
598 let chunk2 = StreamChunk::from_pretty(
599 " I I
600 + 7 8
601 - 3 6",
602 );
603
604 tx1.send(MessageInner::Chunk(chunk1).into()).await.unwrap();
605 tx2.send(MessageInner::Chunk(chunk2).into()).await.unwrap();
606
607 let msg = input.next().await.unwrap().unwrap();
608 assert_eq!(
609 *msg.as_chunk().unwrap(),
610 StreamChunk::from_pretty(
611 " I
612 + 4
613 + 5
614 + 6
615 + 8
616 - 6"
617 )
618 );
619 }
620
621 fn new_input_for_test(
622 actor_id: ActorId,
623 local_barrier_manager: LocalBarrierManager,
624 ) -> (BoxedSinkInput, Sender, UnboundedSender<Barrier>) {
625 let (tx, rx) = channel_for_test();
626 let (barrier_tx, barrier_rx) = unbounded_channel();
627 let merge = MergeExecutor::for_test(
628 actor_id,
629 vec![rx],
630 local_barrier_manager,
631 Schema::new(vec![]),
632 10,
633 Some(barrier_rx),
634 );
635 let input = SinkHandlerInput::new(
636 FragmentId::new(actor_id.as_raw_id()),
637 Box::new(merge),
638 vec![],
639 )
640 .boxed_input();
641 (input, tx, barrier_tx)
642 }
643
644 fn build_test_chunk(size: u64) -> StreamChunk {
645 let ops = vec![Op::Insert; size as usize];
646 StreamChunk::new(ops, vec![])
647 }
648
649 #[tokio::test]
650 async fn test_fixed_upstreams() {
651 let test_env = LocalBarrierTestEnv::for_test().await;
652
653 let actor_id = 2.into();
654
655 let b1 = Barrier::with_prev_epoch_for_test(2, 1);
656
657 test_env.inject_barrier(&b1, [actor_id]);
658 test_env.flush_all_events().await;
659
660 let mut inputs = Vec::with_capacity(3);
661 let mut txs = Vec::with_capacity(3);
662 let mut barrier_txs = Vec::with_capacity(3);
663 for _ in 0..3 {
664 let (input, tx, barrier_tx) =
665 new_input_for_test(actor_id, test_env.local_barrier_manager.clone());
666 inputs.push(input);
667 txs.push(tx);
668 barrier_txs.push(barrier_tx);
669 }
670
671 let sink_union = UpstreamSinkUnionExecutor::for_test(
672 actor_id,
673 test_env.local_barrier_manager.clone(),
674 10,
675 inputs,
676 );
677 test_env.flush_all_events().await;
679 let mut sink_union = Box::new(sink_union).execute_inner().boxed();
680
681 for tx in txs {
682 tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
683 .await
684 .unwrap();
685 tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
686 .await
687 .unwrap();
688 tx.send(MessageInner::Barrier(b1.clone().into_dispatcher()).into())
689 .await
690 .unwrap();
691 }
692
693 for _ in 0..6 {
694 let msg = sink_union.next().await.unwrap().unwrap();
695 assert!(msg.is_chunk());
696 assert_eq!(msg.as_chunk().unwrap().ops().len(), 10);
697 }
698
699 assert!(sink_union.next().now_or_never().is_none());
701
702 for barrier_tx in barrier_txs {
703 barrier_tx.send(b1.clone()).unwrap();
704 }
705
706 let msg = sink_union.next().await.unwrap().unwrap();
707 assert!(msg.is_barrier());
708 let barrier = msg.as_barrier().unwrap();
709 assert_eq!(barrier.epoch, b1.epoch);
710 }
711
712 #[tokio::test]
713 async fn test_dynamic_upstreams() {
714 let test_env = LocalBarrierTestEnv::for_test().await;
715
716 let actor_id = 2.into();
717 let fragment_id = 0.into(); let upstream_fragment_id = 11.into();
719 let upstream_actor_id = 101.into();
720
721 let upstream_actor = helper_make_local_actor(upstream_actor_id);
722
723 let add_upstream = PbNewUpstreamSink {
724 info: Some(PbUpstreamSinkInfo {
725 upstream_fragment_id,
726 sink_output_schema: vec![],
727 project_exprs: vec![],
728 }),
729 upstream_actors: vec![upstream_actor],
730 };
731
732 let b1 = Barrier::new_test_barrier(test_epoch(1));
733 let b2 =
734 Barrier::new_test_barrier(test_epoch(2)).with_mutation(Mutation::Add(AddMutation {
735 new_upstream_sinks: HashMap::from([(fragment_id, add_upstream)]),
736 ..Default::default()
737 }));
738 let b3 = Barrier::new_test_barrier(test_epoch(3));
739 let b4 =
740 Barrier::new_test_barrier(test_epoch(4)).with_mutation(Mutation::Stop(StopMutation {
741 dropped_sink_fragments: HashSet::from([upstream_fragment_id]),
742 ..Default::default()
743 }));
744 for barrier in [&b1, &b2, &b3, &b4] {
745 test_env.inject_barrier(barrier, [actor_id]);
746 }
747 test_env.flush_all_events().await;
748
749 let executor = UpstreamSinkUnionExecutor::for_test(
750 actor_id,
751 test_env.local_barrier_manager.clone(),
752 10,
753 Vec::new(), );
755 test_env.flush_all_events().await;
757
758 let mut exec_stream = Box::new(executor).execute_inner().boxed();
760 let msg = exec_stream.next().await.unwrap().unwrap();
761 assert_eq!(msg.as_barrier().unwrap().epoch, b1.epoch);
762
763 assert!(exec_stream.next().now_or_never().is_none());
766
767 let mut output_req = test_env
768 .take_pending_new_output_requests(upstream_actor_id)
769 .await;
770 let (_, req) = output_req.pop().unwrap();
771 let tx = match req {
772 NewOutputRequest::Local(tx) => tx,
773 NewOutputRequest::Remote(_) => unreachable!(),
774 };
775
776 tx.send(MessageInner::Barrier(b2.clone().into_dispatcher()).into())
777 .await
778 .unwrap();
779 let msg = exec_stream.next().await.unwrap().unwrap();
781 assert_eq!(msg.as_barrier().unwrap().epoch, b2.epoch);
782
783 tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
784 .await
785 .unwrap();
786 let msg = exec_stream.next().await.unwrap().unwrap();
787 assert!(msg.is_chunk());
788
789 tx.send(MessageInner::Barrier(b3.clone().into_dispatcher()).into())
790 .await
791 .unwrap();
792 let msg = exec_stream.next().await.unwrap().unwrap();
793 assert_eq!(msg.as_barrier().unwrap().epoch, b3.epoch);
794
795 tx.send(MessageInner::Barrier(b4.clone().into_dispatcher()).into())
797 .await
798 .unwrap();
799 let msg = exec_stream.next().await.unwrap().unwrap();
801 assert_eq!(msg.as_barrier().unwrap().epoch, b4.epoch);
802 }
803}