fn make_consume_snapshot_stream<'a, S: StateStore>(
upstream_table: &'a StorageTable<S>,
snapshot_epoch: u64,
chunk_size: usize,
rate_limit: Option<usize>,
barrier_rx: &'a mut UnboundedReceiver<Barrier>,
output_indices: &'a [usize],
progress: &'a mut CreateMviewProgressReporter,
first_recv_barrier: Barrier,
) -> impl Stream<Item = Result<Message, StreamExecutorError>> + 'a