async fn make_snapshot_stream(
upstream_table: &BatchTable<impl StateStore>,
snapshot_epoch: u64,
start_pk: Option<OwnedRow>,
rate_limit: RateLimit,
chunk_size: usize,
) -> StreamExecutorResult<VnodeStream<impl ChangeLogRowStream>>