risingwave_meta/hummock/manager/
time_travel.rsuse std::collections::{HashMap, HashSet, VecDeque};
use anyhow::anyhow;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::time_travel::{
refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta,
};
use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{
CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId,
};
use risingwave_meta_model::hummock_sstable_info::SstableInfoV2Backend;
use risingwave_meta_model::{
hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
hummock_time_travel_version,
};
use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
use sea_orm::ActiveValue::Set;
use sea_orm::{
ColumnTrait, Condition, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter,
QueryOrder, QuerySelect, TransactionTrait,
};
use crate::hummock::error::{Error, Result};
use crate::hummock::HummockManager;
impl HummockManager {
pub(crate) async fn init_time_travel_state(&self) -> Result<()> {
let sql_store = self.env.meta_store_ref();
let mut guard = self.versioning.write().await;
guard.mark_next_time_travel_version_snapshot();
guard.last_time_travel_snapshot_sst_ids = HashSet::new();
let Some(version) = hummock_time_travel_version::Entity::find()
.order_by_desc(hummock_time_travel_version::Column::VersionId)
.one(&sql_store.conn)
.await?
.map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
else {
return Ok(());
};
guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids();
Ok(())
}
pub(crate) async fn truncate_time_travel_metadata(
&self,
epoch_watermark: HummockEpoch,
) -> Result<()> {
let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
let sql_store = self.env.meta_store_ref();
let txn = sql_store.conn.begin().await?;
let version_watermark = hummock_epoch_to_version::Entity::find()
.filter(
hummock_epoch_to_version::Column::Epoch
.lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
)
.order_by_desc(hummock_epoch_to_version::Column::Epoch)
.order_by_asc(hummock_epoch_to_version::Column::VersionId)
.one(&txn)
.await?;
let Some(version_watermark) = version_watermark else {
txn.commit().await?;
return Ok(());
};
let watermark_version_id = std::cmp::min(
version_watermark.version_id,
min_pinned_version_id.to_u64().try_into().unwrap(),
);
let res = hummock_epoch_to_version::Entity::delete_many()
.filter(
hummock_epoch_to_version::Column::Epoch
.lt(risingwave_meta_model::Epoch::try_from(epoch_watermark).unwrap()),
)
.exec(&txn)
.await?;
tracing::debug!(
epoch_watermark,
"delete {} rows from hummock_epoch_to_version",
res.rows_affected
);
let latest_valid_version = hummock_time_travel_version::Entity::find()
.filter(hummock_time_travel_version::Column::VersionId.lte(watermark_version_id))
.order_by_desc(hummock_time_travel_version::Column::VersionId)
.one(&txn)
.await?
.map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
let Some(latest_valid_version) = latest_valid_version else {
txn.commit().await?;
return Ok(());
};
let (
latest_valid_version_id,
latest_valid_version_sst_ids,
latest_valid_version_object_ids,
) = {
(
latest_valid_version.id,
latest_valid_version.get_sst_ids(),
latest_valid_version.get_object_ids(),
)
};
let mut object_ids_to_delete: HashSet<_> = HashSet::default();
let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
hummock_time_travel_version::Entity::find()
.select_only()
.column(hummock_time_travel_version::Column::VersionId)
.filter(
hummock_time_travel_version::Column::VersionId
.lt(latest_valid_version_id.to_u64()),
)
.order_by_desc(hummock_time_travel_version::Column::VersionId)
.into_tuple()
.all(&txn)
.await?;
let delta_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
hummock_time_travel_delta::Entity::find()
.select_only()
.column(hummock_time_travel_delta::Column::VersionId)
.filter(
hummock_time_travel_delta::Column::VersionId
.lt(latest_valid_version_id.to_u64()),
)
.into_tuple()
.all(&txn)
.await?;
for delta_id_to_delete in delta_ids_to_delete {
let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete)
.one(&txn)
.await?
.ok_or_else(|| {
Error::TimeTravel(anyhow!(format!(
"version delta {} not found",
delta_id_to_delete
)))
})?;
let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
&delta_to_delete.version_delta.to_protobuf(),
);
let new_sst_ids = delta_to_delete.newly_added_sst_ids();
let sst_ids_to_delete = &new_sst_ids - &latest_valid_version_sst_ids;
let res = hummock_sstable_info::Entity::delete_many()
.filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete))
.exec(&txn)
.await?;
let new_object_ids = delta_to_delete.newly_added_object_ids();
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
tracing::debug!(
delta_id = delta_to_delete.id.to_u64(),
"delete {} rows from hummock_sstable_info",
res.rows_affected
);
}
let mut next_version_sst_ids = latest_valid_version_sst_ids;
for prev_version_id in version_ids_to_delete {
let prev_version = {
let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
.one(&txn)
.await?
.ok_or_else(|| {
Error::TimeTravel(anyhow!(format!(
"prev_version {} not found",
prev_version_id
)))
})?;
IncompleteHummockVersion::from_persisted_protobuf(
&prev_version.version.to_protobuf(),
)
};
let sst_ids = prev_version.get_sst_ids();
let sst_ids_to_delete = &sst_ids - &next_version_sst_ids;
let res = hummock_sstable_info::Entity::delete_many()
.filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete))
.exec(&txn)
.await?;
let new_object_ids = prev_version.get_object_ids();
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
tracing::debug!(
prev_version_id,
"delete {} rows from hummock_sstable_info",
res.rows_affected
);
next_version_sst_ids = sst_ids;
}
if !object_ids_to_delete.is_empty() {
self.gc_manager
.add_may_delete_object_ids(object_ids_to_delete.into_iter());
}
let res = hummock_time_travel_version::Entity::delete_many()
.filter(
hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id.to_u64()),
)
.exec(&txn)
.await?;
tracing::debug!(
epoch_watermark_version_id = ?watermark_version_id,
?latest_valid_version_id,
"delete {} rows from hummock_time_travel_version",
res.rows_affected
);
let res = hummock_time_travel_delta::Entity::delete_many()
.filter(
hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id.to_u64()),
)
.exec(&txn)
.await?;
tracing::debug!(
epoch_watermark_version_id = ?watermark_version_id,
?latest_valid_version_id,
"delete {} rows from hummock_time_travel_delta",
res.rows_affected
);
txn.commit().await?;
Ok(())
}
pub(crate) async fn filter_out_objects_by_time_travel(
&self,
objects: impl Iterator<Item = HummockSstableObjectId>,
) -> Result<HashSet<HummockSstableObjectId>> {
let mut result: HashSet<_> = objects.collect();
let mut remain: VecDeque<_> = result.iter().copied().collect();
const FILTER_BATCH_SIZE: usize = 1000;
while !remain.is_empty() {
let batch = remain.drain(..std::cmp::min(remain.len(), FILTER_BATCH_SIZE));
let reject_object_ids: Vec<risingwave_meta_model::HummockSstableObjectId> =
hummock_sstable_info::Entity::find()
.filter(hummock_sstable_info::Column::ObjectId.is_in(batch))
.select_only()
.column(hummock_sstable_info::Column::ObjectId)
.into_tuple()
.all(&self.env.meta_store_ref().conn)
.await?;
for reject in reject_object_ids {
let object_id = HummockSstableObjectId::try_from(reject).unwrap();
result.remove(&object_id);
}
}
Ok(result)
}
pub(crate) async fn time_travel_pinned_object_count(&self) -> Result<u64> {
let count = hummock_sstable_info::Entity::find()
.count(&self.env.meta_store_ref().conn)
.await?;
Ok(count)
}
pub async fn epoch_to_version(
&self,
query_epoch: HummockEpoch,
table_id: u32,
) -> Result<HummockVersion> {
let sql_store = self.env.meta_store_ref();
let _permit = self.inflight_time_travel_query.try_acquire().map_err(|_| {
anyhow!(format!(
"too many inflight time travel queries, max_inflight_time_travel_query={}",
self.env.opts.max_inflight_time_travel_query
))
})?;
let epoch_to_version = hummock_epoch_to_version::Entity::find()
.filter(
Condition::any()
.add(hummock_epoch_to_version::Column::TableId.eq(i64::from(table_id)))
.add(hummock_epoch_to_version::Column::TableId.eq(0)),
)
.filter(
hummock_epoch_to_version::Column::Epoch
.lte(risingwave_meta_model::Epoch::try_from(query_epoch).unwrap()),
)
.order_by_desc(hummock_epoch_to_version::Column::Epoch)
.one(&sql_store.conn)
.await?
.ok_or_else(|| {
Error::TimeTravel(anyhow!(format!(
"version not found for epoch {}",
query_epoch
)))
})?;
let timer = self
.metrics
.time_travel_version_replay_latency
.start_timer();
let actual_version_id = epoch_to_version.version_id;
tracing::debug!(
query_epoch,
query_tz = ?(Epoch(query_epoch).as_timestamptz()),
actual_epoch = epoch_to_version.epoch,
actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()),
actual_version_id,
"convert query epoch"
);
let replay_version = hummock_time_travel_version::Entity::find()
.filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id))
.order_by_desc(hummock_time_travel_version::Column::VersionId)
.one(&sql_store.conn)
.await?
.ok_or_else(|| {
Error::TimeTravel(anyhow!(format!(
"no replay version found for epoch {}, version {}",
query_epoch, actual_version_id,
)))
})?;
let deltas = hummock_time_travel_delta::Entity::find()
.filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id))
.filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id))
.order_by_asc(hummock_time_travel_delta::Column::VersionId)
.all(&sql_store.conn)
.await?;
let mut actual_version = replay_archive(
replay_version.version.to_protobuf(),
deltas.into_iter().map(|d| d.version_delta.to_protobuf()),
);
let mut sst_ids = actual_version
.get_sst_ids()
.into_iter()
.collect::<VecDeque<_>>();
let sst_count = sst_ids.len();
let mut sst_id_to_info = HashMap::with_capacity(sst_count);
let sst_info_fetch_batch_size = self.env.opts.hummock_time_travel_sst_info_fetch_batch_size;
while !sst_ids.is_empty() {
let sst_infos = hummock_sstable_info::Entity::find()
.filter(hummock_sstable_info::Column::SstId.is_in(
sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())),
))
.all(&sql_store.conn)
.await?;
for sst_info in sst_infos {
let sst_info: SstableInfo = sst_info.sstable_info.to_protobuf().into();
sst_id_to_info.insert(sst_info.sst_id, sst_info);
}
}
if sst_count != sst_id_to_info.len() {
return Err(Error::TimeTravel(anyhow!(format!(
"some SstableInfos not found for epoch {}, version {}",
query_epoch, actual_version_id,
))));
}
refill_version(&mut actual_version, &sst_id_to_info, table_id);
timer.observe_duration();
Ok(actual_version)
}
pub(crate) async fn write_time_travel_metadata(
&self,
txn: &DatabaseTransaction,
version: Option<&HummockVersion>,
delta: HummockVersionDelta,
group_parents: &HashMap<CompactionGroupId, CompactionGroupId>,
skip_sst_ids: &HashSet<HummockSstableId>,
tables_to_commit: impl Iterator<Item = (&TableId, &CompactionGroupId, u64)>,
) -> Result<Option<HashSet<HummockSstableId>>> {
let select_groups = group_parents
.iter()
.filter_map(|(cg_id, _)| {
if should_ignore_group(find_root_group(*cg_id, group_parents)) {
None
} else {
Some(*cg_id)
}
})
.collect::<HashSet<_>>();
async fn write_sstable_infos(
sst_infos: impl Iterator<Item = &SstableInfo>,
txn: &DatabaseTransaction,
) -> Result<usize> {
let mut count = 0;
for sst_info in sst_infos {
let m = hummock_sstable_info::ActiveModel {
sst_id: Set(sst_info.sst_id.try_into().unwrap()),
object_id: Set(sst_info.object_id.try_into().unwrap()),
sstable_info: Set(SstableInfoV2Backend::from(&sst_info.to_protobuf())),
};
hummock_sstable_info::Entity::insert(m)
.on_conflict_do_nothing()
.exec(txn)
.await?;
count += 1;
}
Ok(count)
}
for (table_id, cg_id, committed_epoch) in tables_to_commit {
if !select_groups.contains(cg_id) {
continue;
}
let version_id: u64 = delta.id.to_u64();
let m = hummock_epoch_to_version::ActiveModel {
epoch: Set(committed_epoch.try_into().unwrap()),
table_id: Set(table_id.table_id.into()),
version_id: Set(version_id.try_into().unwrap()),
};
hummock_epoch_to_version::Entity::insert(m)
.exec(txn)
.await?;
}
let mut version_sst_ids = None;
if let Some(version) = version {
version_sst_ids = Some(
version
.get_sst_infos_from_groups(&select_groups)
.map(|s| s.sst_id)
.collect(),
);
write_sstable_infos(
version
.get_sst_infos_from_groups(&select_groups)
.filter(|s| !skip_sst_ids.contains(&s.sst_id)),
txn,
)
.await?;
let m = hummock_time_travel_version::ActiveModel {
version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
version.id.to_u64(),
)
.unwrap()),
version: Set((&IncompleteHummockVersion::from((version, &select_groups))
.to_protobuf())
.into()),
};
hummock_time_travel_version::Entity::insert(m)
.on_conflict_do_nothing()
.exec(txn)
.await?;
}
let written = write_sstable_infos(
delta
.newly_added_sst_infos(Some(&select_groups))
.filter(|s| !skip_sst_ids.contains(&s.sst_id)),
txn,
)
.await?;
if written > 0 {
let m = hummock_time_travel_delta::ActiveModel {
version_id: Set(risingwave_meta_model::HummockVersionId::try_from(
delta.id.to_u64(),
)
.unwrap()),
version_delta: Set((&IncompleteHummockVersionDelta::from((
&delta,
&select_groups,
))
.to_protobuf())
.into()),
};
hummock_time_travel_delta::Entity::insert(m)
.on_conflict_do_nothing()
.exec(txn)
.await?;
}
Ok(version_sst_ids)
}
}
fn replay_archive(
version: PbHummockVersion,
deltas: impl Iterator<Item = PbHummockVersionDelta>,
) -> HummockVersion {
let mut last_version = HummockVersion::from_persisted_protobuf(&version);
for d in deltas {
let d = HummockVersionDelta::from_persisted_protobuf(&d);
debug_assert!(
!should_mark_next_time_travel_version_snapshot(&d),
"unexpected time travel delta {:?}",
d
);
while last_version.id < d.prev_id {
last_version.id = last_version.id + 1;
}
last_version.apply_version_delta(&d);
}
last_version
}
fn find_root_group(
group_id: CompactionGroupId,
parents: &HashMap<CompactionGroupId, CompactionGroupId>,
) -> CompactionGroupId {
let mut root = group_id;
while let Some(parent) = parents.get(&root)
&& *parent != 0
{
root = *parent;
}
root
}
fn should_ignore_group(root_group_id: CompactionGroupId) -> bool {
root_group_id == StaticCompactionGroupId::StateDefault as CompactionGroupId
}
pub fn require_sql_meta_store_err() -> Error {
Error::TimeTravel(anyhow!("require SQL meta store"))
}
pub fn should_mark_next_time_travel_version_snapshot(delta: &HummockVersionDelta) -> bool {
delta.group_deltas.iter().any(|(_, deltas)| {
deltas
.group_deltas
.iter()
.any(|d| !matches!(d, GroupDeltaCommon::NewL0SubLevel(_)))
})
}