async fn receive_next_barrier( barrier_rx: &mut UnboundedReceiver<Barrier>, ) -> StreamExecutorResult<Barrier>