async fn recovery( db: &DatabaseConnection, sink_id: SinkId, ) -> Result<(Option<u64>, Vec<EpochCommit>)>
Read every persisted row for this sink, recovering prev_committed_epoch and pending commits.
prev_committed_epoch