1use std::collections::{HashMap, HashSet, VecDeque};
16use std::iter;
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20use anyhow::Context as _;
21use futures::future::try_join_all;
22use itertools::Itertools;
23use pin_project::pin_project;
24use risingwave_common::catalog::Field;
25use risingwave_expr::expr::{EvalErrorReport, NonStrictExpression, build_non_strict_from_prost};
26use risingwave_pb::common::PbActorInfo;
27use risingwave_pb::expr::PbExprNode;
28use risingwave_pb::plan_common::PbField;
29use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
30use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
31use rw_futures_util::pending_on_none;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33
34use crate::executor::exchange::input::{Input, assert_equal_dispatcher_barrier, new_input};
35use crate::executor::prelude::*;
36use crate::executor::project::apply_project_exprs;
37use crate::executor::{
38 BarrierMutationType, BoxedMessageInput, DynamicReceivers, MergeExecutor, Message,
39};
40use crate::task::{ActorEvalErrorReport, FragmentId, LocalBarrierManager};
41
42type ProcessedMessageStream = impl Stream<Item = MessageStreamItem>;
43
44#[pin_project]
47pub struct SinkHandlerInput {
48 upstream_fragment_id: FragmentId,
50
51 #[pin]
53 processed_stream: ProcessedMessageStream,
54}
55
56impl SinkHandlerInput {
57 pub fn new(
58 upstream_fragment_id: FragmentId,
59 merge: Box<MergeExecutor>,
60 project_exprs: Vec<NonStrictExpression>,
61 ) -> Self {
62 let processed_stream = Self::apply_project_exprs_stream(merge, project_exprs);
63 Self {
64 upstream_fragment_id,
65 processed_stream,
66 }
67 }
68
69 #[define_opaque(ProcessedMessageStream)]
70 fn apply_project_exprs_stream(
71 merge: Box<MergeExecutor>,
72 project_exprs: Vec<NonStrictExpression>,
73 ) -> ProcessedMessageStream {
74 Self::apply_project_exprs_stream_impl(merge, project_exprs)
76 }
77
78 #[try_stream(ok = Message, error = StreamExecutorError)]
80 async fn apply_project_exprs_stream_impl(
81 merge: Box<MergeExecutor>,
82 project_exprs: Vec<NonStrictExpression>,
83 ) {
84 let merge_stream = merge.execute_inner();
85 pin_mut!(merge_stream);
86 while let Some(msg) = merge_stream.next().await {
87 let msg = msg?;
88 if let Message::Chunk(chunk) = msg {
89 let new_chunk = apply_project_exprs(&project_exprs, chunk).await?;
91 yield Message::Chunk(new_chunk);
92 } else {
93 yield msg;
94 }
95 }
96 }
97}
98
99impl Input for SinkHandlerInput {
100 type InputId = FragmentId;
101
102 fn id(&self) -> Self::InputId {
103 self.upstream_fragment_id
105 }
106}
107
108impl Stream for SinkHandlerInput {
109 type Item = MessageStreamItem;
110
111 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112 self.project().processed_stream.poll_next(cx)
113 }
114}
115
116#[derive(Debug)]
118pub struct UpstreamFragmentInfo {
119 pub upstream_fragment_id: FragmentId,
120 pub upstream_actors: Vec<PbActorInfo>,
121 pub merge_schema: Schema,
122 pub project_exprs: Vec<NonStrictExpression>,
123}
124
125impl UpstreamFragmentInfo {
126 pub fn new(
127 upstream_fragment_id: FragmentId,
128 initial_upstream_actors: &HashMap<FragmentId, UpstreamActors>,
129 sink_output_schema: &[PbField],
130 project_exprs: &[PbExprNode],
131 error_report: impl EvalErrorReport + 'static,
132 ) -> StreamResult<Self> {
133 let actors = initial_upstream_actors
134 .get(&upstream_fragment_id)
135 .ok_or_else(|| {
136 anyhow::anyhow!(
137 "upstream fragment {} not found in initial upstream actors",
138 upstream_fragment_id
139 )
140 })?;
141 let merge_schema = sink_output_schema.iter().map(Field::from).collect();
142 let project_exprs = project_exprs
143 .iter()
144 .map(|e| build_non_strict_from_prost(e, error_report.clone()))
145 .try_collect()
146 .map_err(|err| anyhow::anyhow!(err))?;
147 Ok(Self {
148 upstream_fragment_id,
149 upstream_actors: actors.actors.clone(),
150 merge_schema,
151 project_exprs,
152 })
153 }
154}
155
156type BoxedSinkInput = BoxedMessageInput<FragmentId, BarrierMutationType>;
157
158pub struct UpstreamSinkUnionExecutor {
170 actor_context: ActorContextRef,
172
173 executor_stats: Arc<StreamingMetrics>,
175
176 initial_inputs: Vec<BoxedSinkInput>,
178
179 upstream_sink_barrier_mgr: UpstreamSinkBarrierManager,
181}
182
183impl Debug for UpstreamSinkUnionExecutor {
184 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
185 f.debug_struct("UpstreamSinkUnionExecutor")
186 .field(
187 "initial_upstream_fragments",
188 &self.initial_inputs.iter().map(|i| i.id()).collect_vec(),
189 )
190 .finish()
191 }
192}
193
194impl Execute for UpstreamSinkUnionExecutor {
195 fn execute(self: Box<Self>) -> BoxedMessageStream {
196 self.execute_inner().boxed()
197 }
198}
199
200struct PendingSinkBarrier {
201 barrier: Barrier,
202 new_input: Option<BoxedSinkInput>,
203 removed_upstream_fragment_ids: HashSet<FragmentId>,
204}
205
206struct BuildSinkInputContext {
207 actor_context: ActorContextRef,
209
210 local_barrier_manager: LocalBarrierManager,
212
213 executor_stats: Arc<StreamingMetrics>,
215
216 chunk_size: usize,
218
219 eval_error_report: ActorEvalErrorReport,
221}
222
223struct UpstreamSinkBarrierManager {
224 build_input_ctx: BuildSinkInputContext,
226
227 barrier_rx: UnboundedReceiver<Barrier>,
229
230 barrier_tx_map: HashMap<FragmentId, UnboundedSender<Barrier>>,
232
233 pending_barriers: VecDeque<PendingSinkBarrier>,
235}
236
237impl UpstreamSinkUnionExecutor {
238 pub async fn new(
240 ctx: ActorContextRef,
241 local_barrier_manager: LocalBarrierManager,
242 executor_stats: Arc<StreamingMetrics>,
243 chunk_size: usize,
244 initial_upstream_infos: Vec<UpstreamFragmentInfo>,
245 eval_error_report: ActorEvalErrorReport,
246 ) -> StreamExecutorResult<Self> {
247 let barrier_rx = local_barrier_manager.subscribe_barrier(ctx.id);
248 let build_input_ctx = BuildSinkInputContext {
249 actor_context: ctx.clone(),
250 local_barrier_manager,
251 executor_stats: executor_stats.clone(),
252 chunk_size,
253 eval_error_report,
254 };
255 let mut upstream_sink_barrier_mgr =
256 UpstreamSinkBarrierManager::new(build_input_ctx, barrier_rx);
257
258 let mut initial_inputs = Vec::with_capacity(initial_upstream_infos.len());
259 for info in initial_upstream_infos {
260 let input = upstream_sink_barrier_mgr.new_sink_input_impl(info).await?;
261 initial_inputs.push(input);
262 }
263
264 let executor = Self {
265 actor_context: ctx,
266 executor_stats,
267 initial_inputs,
268 upstream_sink_barrier_mgr,
269 };
270
271 Ok(executor)
272 }
273
274 #[cfg(test)]
275 pub fn for_test(
276 actor_id: ActorId,
277 local_barrier_manager: LocalBarrierManager,
278 chunk_size: usize,
279 initial_inputs: Vec<BoxedSinkInput>,
280 ) -> Self {
281 let actor_ctx = ActorContext::for_test(actor_id);
282 let executor_stats: Arc<StreamingMetrics> = StreamingMetrics::unused().into();
283 let eval_error_report = ActorEvalErrorReport {
284 actor_context: actor_ctx.clone(),
285 identity: format!("UpstreamSinkUnionExecutor-{}", actor_id).into(),
286 };
287
288 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
289 let build_input_ctx = BuildSinkInputContext {
290 actor_context: actor_ctx.clone(),
291 local_barrier_manager,
292 executor_stats: executor_stats.clone(),
293 chunk_size,
294 eval_error_report,
295 };
296 let upstream_sink_barrier_mgr =
297 UpstreamSinkBarrierManager::new(build_input_ctx, barrier_rx);
298
299 Self {
300 actor_context: actor_ctx,
301 executor_stats,
302 initial_inputs,
303 upstream_sink_barrier_mgr,
304 }
305 }
306
307 #[try_stream(ok = Message, error = StreamExecutorError)]
308 async fn execute_inner(self: Box<Self>) {
309 let actor_id = self.actor_context.id;
310 let fragment_id = self.actor_context.fragment_id;
311
312 let barrier_align = self
313 .executor_stats
314 .barrier_align_duration
315 .with_guarded_label_values(&[
316 actor_id.to_string().as_str(),
317 fragment_id.to_string().as_str(),
318 "",
319 "UpstreamSinkUnion",
320 ]);
321
322 let upstreams =
323 DynamicReceivers::new(self.initial_inputs, Some(barrier_align.clone()), None);
324 pin_mut!(upstreams);
325
326 let mut upstream_sink_barrier_mgr = self.upstream_sink_barrier_mgr;
327
328 loop {
329 let msg = upstream_sink_barrier_mgr
330 .await_next_message(&mut upstreams)
331 .await?;
332
333 if let Message::Barrier(barrier) = &msg {
334 let pending_barrier = upstream_sink_barrier_mgr.pop_pending_barrier();
335 assert_equal_dispatcher_barrier(barrier, &pending_barrier.barrier);
336
337 if let Some(mut new_sink_input) = pending_barrier.new_input {
338 let first_barrier = expect_first_barrier(&mut new_sink_input).await?;
339 assert_equal_dispatcher_barrier(&pending_barrier.barrier, &first_barrier);
340 upstreams.add_upstreams_from(iter::once(new_sink_input));
341 }
342
343 if !pending_barrier.removed_upstream_fragment_ids.is_empty() {
344 upstreams.remove_upstreams(&pending_barrier.removed_upstream_fragment_ids);
345 }
346 }
347
348 yield msg;
349 }
350 }
351}
352
353impl UpstreamSinkBarrierManager {
354 fn new(build_input_ctx: BuildSinkInputContext, barrier_rx: UnboundedReceiver<Barrier>) -> Self {
355 Self {
356 build_input_ctx,
357 barrier_rx,
358 barrier_tx_map: HashMap::new(),
359 pending_barriers: VecDeque::new(),
360 }
361 }
362
363 async fn await_next_message(
364 &mut self,
365 upstreams: &mut DynamicReceivers<FragmentId, BarrierMutationType>,
366 ) -> StreamExecutorResult<Message> {
367 loop {
368 if upstreams.is_empty() && !self.pending_barriers.is_empty() {
369 let barrier = self.pending_barriers.front().unwrap().barrier.clone();
370 return Ok(Message::Barrier(barrier));
371 }
372 tokio::select! {
373 biased;
374
375 msg = pending_on_none(upstreams.next()) => {
378 return msg;
379 }
380
381 barrier = self.barrier_rx.recv() => {
382 let barrier = barrier.context("Failed to receive barrier from barrier_rx")?;
386 let pending_barrier = self.partially_apply_barrier(barrier).await?;
387 self.pending_barriers.push_back(pending_barrier);
388 continue;
389 }
390 }
391 }
392 }
393
394 async fn partially_apply_barrier(
395 &mut self,
396 barrier: Barrier,
397 ) -> StreamExecutorResult<PendingSinkBarrier> {
398 let new_sink_input = if let Some(new_upstream_sink) =
399 barrier.as_new_upstream_sink(self.build_input_ctx.actor_context.fragment_id)
400 {
401 let new_input = self.new_sink_input(new_upstream_sink).await?;
403 Some(new_input)
404 } else {
405 None
406 };
407
408 self.forward_barrier_to_all_upstreams(&barrier)?;
409
410 let removed_upstream_fragment_ids = if let Some(dropped_upstream_sinks) =
411 barrier.as_dropped_upstream_sinks()
412 && !dropped_upstream_sinks.is_empty()
413 {
414 self.barrier_tx_map
416 .extract_if(|upstream_fragment_id, _| {
417 dropped_upstream_sinks.contains(upstream_fragment_id)
418 })
419 .map(|(upstream_fragment_id, _)| upstream_fragment_id)
420 .collect()
421 } else {
422 HashSet::new()
423 };
424
425 Ok(PendingSinkBarrier {
426 barrier,
427 new_input: new_sink_input,
428 removed_upstream_fragment_ids,
429 })
430 }
431
432 fn pop_pending_barrier(&mut self) -> PendingSinkBarrier {
433 self.pending_barriers
434 .pop_front()
435 .expect("should always have a pending barrier when receiving a barrier from upstreams")
436 }
437
438 fn forward_barrier_to_all_upstreams(&self, barrier: &Barrier) -> StreamExecutorResult<()> {
439 for tx in self.barrier_tx_map.values() {
440 tx.send(barrier.clone())
441 .map_err(|e| StreamExecutorError::from(anyhow::anyhow!(e)))?;
442 }
443 Ok(())
444 }
445
446 async fn new_sink_input(
447 &mut self,
448 pb_upstream_info: &PbNewUpstreamSink,
449 ) -> StreamExecutorResult<BoxedSinkInput> {
450 let info = pb_upstream_info.get_info().unwrap();
451 let merge_schema = info
452 .get_sink_output_schema()
453 .iter()
454 .map(Field::from)
455 .collect();
456 let project_exprs = info
457 .get_project_exprs()
458 .iter()
459 .map(|e| build_non_strict_from_prost(e, self.build_input_ctx.eval_error_report.clone()))
460 .try_collect()
461 .map_err(|err| anyhow::anyhow!(err))?;
462 let upstream_fragment_id = info.get_upstream_fragment_id();
463 self.new_sink_input_impl(UpstreamFragmentInfo {
464 upstream_fragment_id,
465 upstream_actors: pb_upstream_info.get_upstream_actors().clone(),
466 merge_schema,
467 project_exprs,
468 })
469 .await
470 }
471
472 async fn new_sink_input_impl(
473 &mut self,
474 UpstreamFragmentInfo {
475 upstream_fragment_id,
476 upstream_actors,
477 merge_schema,
478 project_exprs,
479 }: UpstreamFragmentInfo,
480 ) -> StreamExecutorResult<BoxedSinkInput> {
481 let merge_executor = self
482 .new_merge_executor(upstream_fragment_id, upstream_actors, merge_schema)
483 .await?;
484
485 Ok(SinkHandlerInput::new(
486 upstream_fragment_id,
487 Box::new(merge_executor),
488 project_exprs,
489 )
490 .boxed_input())
491 }
492
493 async fn new_merge_executor(
494 &mut self,
495 upstream_fragment_id: FragmentId,
496 upstream_actors: Vec<PbActorInfo>,
497 schema: Schema,
498 ) -> StreamExecutorResult<MergeExecutor> {
499 let ctx = &self.build_input_ctx;
500 let inputs = try_join_all(upstream_actors.iter().map(|actor| {
501 new_input(
502 &ctx.local_barrier_manager,
503 ctx.executor_stats.clone(),
504 ctx.actor_context.id,
505 ctx.actor_context.fragment_id,
506 actor,
507 upstream_fragment_id,
508 )
509 }))
510 .await?;
511
512 let (barrier_tx, barrier_rx) = unbounded_channel();
513 self.barrier_tx_map
514 .try_insert(upstream_fragment_id, barrier_tx)
515 .expect("non-duplicate");
516
517 let upstreams =
518 MergeExecutor::new_select_receiver(inputs, &ctx.executor_stats, &ctx.actor_context);
519
520 Ok(MergeExecutor::new(
521 ctx.actor_context.clone(),
522 ctx.actor_context.fragment_id,
523 upstream_fragment_id,
524 upstreams,
525 ctx.local_barrier_manager.clone(),
526 ctx.executor_stats.clone(),
527 barrier_rx,
528 ctx.chunk_size,
529 schema,
530 ))
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use std::collections::HashSet;
537
538 use futures::FutureExt;
539 use risingwave_common::array::{Op, StreamChunkTestExt};
540 use risingwave_common::catalog::Field;
541 use risingwave_common::util::epoch::test_epoch;
542 use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
543 use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
544
545 use super::*;
546 use crate::executor::exchange::permit::{Sender, channel_for_test};
547 use crate::executor::test_utils::expr::build_from_pretty;
548 use crate::executor::{AddMutation, MessageInner, StopMutation};
549 use crate::task::NewOutputRequest;
550 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
551 use crate::task::test_utils::helper_make_local_actor;
552
553 #[tokio::test]
554 async fn test_sink_input() {
555 let test_env = LocalBarrierTestEnv::for_test().await;
556
557 let actor_id = 2;
558
559 let b1 = Barrier::with_prev_epoch_for_test(2, 1);
560
561 test_env.inject_barrier(&b1, [actor_id]);
562 test_env.flush_all_events().await;
563
564 let schema = Schema {
565 fields: vec![
566 Field::unnamed(DataType::Int64),
567 Field::unnamed(DataType::Int64),
568 ],
569 };
570
571 let (tx1, rx1) = channel_for_test();
572 let (tx2, rx2) = channel_for_test();
573
574 let merge = MergeExecutor::for_test(
575 actor_id,
576 vec![rx1, rx2],
577 test_env.local_barrier_manager.clone(),
578 schema.clone(),
579 5,
580 None,
581 );
582
583 let test_expr = build_from_pretty("$1:int8");
584
585 let mut input = SinkHandlerInput::new(
586 1919, Box::new(merge),
588 vec![test_expr],
589 )
590 .boxed_input();
591
592 let chunk1 = StreamChunk::from_pretty(
593 " I I
594 + 1 4
595 + 2 5
596 + 3 6",
597 );
598 let chunk2 = StreamChunk::from_pretty(
599 " I I
600 + 7 8
601 - 3 6",
602 );
603
604 tx1.send(MessageInner::Chunk(chunk1).into()).await.unwrap();
605 tx2.send(MessageInner::Chunk(chunk2).into()).await.unwrap();
606
607 let msg = input.next().await.unwrap().unwrap();
608 assert_eq!(
609 *msg.as_chunk().unwrap(),
610 StreamChunk::from_pretty(
611 " I
612 + 4
613 + 5
614 + 6
615 + 8
616 - 6"
617 )
618 );
619 }
620
621 fn new_input_for_test(
622 actor_id: ActorId,
623 local_barrier_manager: LocalBarrierManager,
624 ) -> (BoxedSinkInput, Sender, UnboundedSender<Barrier>) {
625 let (tx, rx) = channel_for_test();
626 let (barrier_tx, barrier_rx) = unbounded_channel();
627 let merge = MergeExecutor::for_test(
628 actor_id,
629 vec![rx],
630 local_barrier_manager,
631 Schema::new(vec![]),
632 10,
633 Some(barrier_rx),
634 );
635 let input = SinkHandlerInput::new(actor_id, Box::new(merge), vec![]).boxed_input();
636 (input, tx, barrier_tx)
637 }
638
639 fn build_test_chunk(size: u64) -> StreamChunk {
640 let ops = vec![Op::Insert; size as usize];
641 StreamChunk::new(ops, vec![])
642 }
643
644 #[tokio::test]
645 async fn test_fixed_upstreams() {
646 let test_env = LocalBarrierTestEnv::for_test().await;
647
648 let actor_id = 2;
649
650 let b1 = Barrier::with_prev_epoch_for_test(2, 1);
651
652 test_env.inject_barrier(&b1, [actor_id]);
653 test_env.flush_all_events().await;
654
655 let mut inputs = Vec::with_capacity(3);
656 let mut txs = Vec::with_capacity(3);
657 let mut barrier_txs = Vec::with_capacity(3);
658 for _ in 0..3 {
659 let (input, tx, barrier_tx) =
660 new_input_for_test(actor_id, test_env.local_barrier_manager.clone());
661 inputs.push(input);
662 txs.push(tx);
663 barrier_txs.push(barrier_tx);
664 }
665
666 let sink_union = UpstreamSinkUnionExecutor::for_test(
667 actor_id,
668 test_env.local_barrier_manager.clone(),
669 10,
670 inputs,
671 );
672 test_env.flush_all_events().await;
674 let mut sink_union = Box::new(sink_union).execute_inner().boxed();
675
676 for tx in txs {
677 tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
678 .await
679 .unwrap();
680 tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
681 .await
682 .unwrap();
683 tx.send(MessageInner::Barrier(b1.clone().into_dispatcher()).into())
684 .await
685 .unwrap();
686 }
687
688 for _ in 0..6 {
689 let msg = sink_union.next().await.unwrap().unwrap();
690 assert!(msg.is_chunk());
691 assert_eq!(msg.as_chunk().unwrap().ops().len(), 10);
692 }
693
694 assert!(sink_union.next().now_or_never().is_none());
696
697 for barrier_tx in barrier_txs {
698 barrier_tx.send(b1.clone()).unwrap();
699 }
700
701 let msg = sink_union.next().await.unwrap().unwrap();
702 assert!(msg.is_barrier());
703 let barrier = msg.as_barrier().unwrap();
704 assert_eq!(barrier.epoch, b1.epoch);
705 }
706
707 #[tokio::test]
708 async fn test_dynamic_upstreams() {
709 let test_env = LocalBarrierTestEnv::for_test().await;
710
711 let actor_id = 2;
712 let fragment_id = 0; let upstream_fragment_id = 11;
714 let upstream_actor_id = 101;
715
716 let upstream_actor = helper_make_local_actor(upstream_actor_id);
717
718 let add_upstream = PbNewUpstreamSink {
719 info: Some(PbUpstreamSinkInfo {
720 upstream_fragment_id,
721 sink_output_schema: vec![],
722 project_exprs: vec![],
723 }),
724 upstream_actors: vec![upstream_actor],
725 };
726
727 let b1 = Barrier::new_test_barrier(test_epoch(1));
728 let b2 =
729 Barrier::new_test_barrier(test_epoch(2)).with_mutation(Mutation::Add(AddMutation {
730 new_upstream_sinks: HashMap::from([(fragment_id, add_upstream)]),
731 ..Default::default()
732 }));
733 let b3 = Barrier::new_test_barrier(test_epoch(3));
734 let b4 =
735 Barrier::new_test_barrier(test_epoch(4)).with_mutation(Mutation::Stop(StopMutation {
736 dropped_sink_fragments: HashSet::from([upstream_fragment_id]),
737 ..Default::default()
738 }));
739 for barrier in [&b1, &b2, &b3, &b4] {
740 test_env.inject_barrier(barrier, [actor_id]);
741 }
742 test_env.flush_all_events().await;
743
744 let executor = UpstreamSinkUnionExecutor::for_test(
745 actor_id,
746 test_env.local_barrier_manager.clone(),
747 10,
748 Vec::new(), );
750 test_env.flush_all_events().await;
752
753 let mut exec_stream = Box::new(executor).execute_inner().boxed();
755 let msg = exec_stream.next().await.unwrap().unwrap();
756 assert_eq!(msg.as_barrier().unwrap().epoch, b1.epoch);
757
758 assert!(exec_stream.next().now_or_never().is_none());
761
762 let mut output_req = test_env
763 .take_pending_new_output_requests(upstream_actor_id)
764 .await;
765 let (_, req) = output_req.pop().unwrap();
766 let tx = match req {
767 NewOutputRequest::Local(tx) => tx,
768 NewOutputRequest::Remote(_) => unreachable!(),
769 };
770
771 tx.send(MessageInner::Barrier(b2.clone().into_dispatcher()).into())
772 .await
773 .unwrap();
774 let msg = exec_stream.next().await.unwrap().unwrap();
776 assert_eq!(msg.as_barrier().unwrap().epoch, b2.epoch);
777
778 tx.send(MessageInner::Chunk(build_test_chunk(10)).into())
779 .await
780 .unwrap();
781 let msg = exec_stream.next().await.unwrap().unwrap();
782 assert!(msg.is_chunk());
783
784 tx.send(MessageInner::Barrier(b3.clone().into_dispatcher()).into())
785 .await
786 .unwrap();
787 let msg = exec_stream.next().await.unwrap().unwrap();
788 assert_eq!(msg.as_barrier().unwrap().epoch, b3.epoch);
789
790 tx.send(MessageInner::Barrier(b4.clone().into_dispatcher()).into())
792 .await
793 .unwrap();
794 let msg = exec_stream.next().await.unwrap().unwrap();
796 assert_eq!(msg.as_barrier().unwrap().epoch, b4.epoch);
797 }
798}