async fn make_snapshot_stream(
upstream_table: &BatchTable<impl StateStore>,
snapshot_epoch: u64,
backfill_state: &BackfillState<impl StateStore>,
rate_limit: RateLimit,
chunk_size: usize,
) -> StreamExecutorResult<VnodeStream<impl ChangeLogRowStream>>