risingwave_meta/hummock/manager/
checkpoint.rsuse std::collections::HashMap;
use std::ops::Bound::{Excluded, Included};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::Ordering;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion};
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
use risingwave_pb::hummock::{
PbHummockVersion, PbHummockVersionArchive, PbHummockVersionCheckpoint,
};
use thiserror_ext::AsReport;
use tracing::warn;
use crate::hummock::error::Result;
use crate::hummock::manager::versioning::Versioning;
use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat};
use crate::hummock::HummockManager;
#[derive(Default)]
pub struct HummockVersionCheckpoint {
pub version: HummockVersion,
pub stale_objects: HashMap<HummockVersionId, PbStaleObjects>,
}
impl HummockVersionCheckpoint {
pub fn from_protobuf(checkpoint: &PbHummockVersionCheckpoint) -> Self {
Self {
version: HummockVersion::from_persisted_protobuf(checkpoint.version.as_ref().unwrap()),
stale_objects: checkpoint
.stale_objects
.iter()
.map(|(version_id, objects)| (HummockVersionId::new(*version_id), objects.clone()))
.collect(),
}
}
pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
PbHummockVersionCheckpoint {
version: Some(PbHummockVersion::from(&self.version)),
stale_objects: self
.stale_objects
.iter()
.map(|(version_id, objects)| (version_id.to_u64(), objects.clone()))
.collect(),
}
}
}
impl HummockManager {
pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
use prost::Message;
let data = match self
.object_store
.read(&self.version_checkpoint_path, ..)
.await
{
Ok(data) => data,
Err(e) => {
if e.is_object_not_found_error() {
return Ok(None);
}
return Err(e.into());
}
};
let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
}
pub(super) async fn write_checkpoint(
&self,
checkpoint: &HummockVersionCheckpoint,
) -> Result<()> {
use prost::Message;
let buf = checkpoint.to_protobuf().encode_to_vec();
self.object_store
.upload(&self.version_checkpoint_path, buf.into())
.await?;
Ok(())
}
pub(super) async fn write_version_archive(
&self,
archive: &PbHummockVersionArchive,
) -> Result<()> {
use prost::Message;
let buf = archive.encode_to_vec();
let archive_path = format!(
"{}/{}",
self.version_archive_dir,
archive.version.as_ref().unwrap().id
);
self.object_store.upload(&archive_path, buf.into()).await?;
Ok(())
}
pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
let timer = self.metrics.version_checkpoint_latency.start_timer();
let versioning_guard = self.versioning.read().await;
let versioning: &Versioning = versioning_guard.deref();
let current_version: &HummockVersion = &versioning.current_version;
let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
let new_checkpoint_id = current_version.id;
let old_checkpoint_id = old_checkpoint.version.id;
if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
return Ok(0);
}
if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
drop(versioning_guard);
let versioning = self.versioning.read().await;
let context_info = self.context_info.read().await;
let min_pinned_version_id = context_info.min_pinned_version_id();
trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
return Ok(0);
}
assert!(new_checkpoint_id > old_checkpoint_id);
let mut archive: Option<PbHummockVersionArchive> = None;
let mut stale_objects = old_checkpoint.stale_objects.clone();
let mut object_sizes = object_size_map(&old_checkpoint.version);
let mut versions_object_ids = old_checkpoint.version.get_object_ids();
for (_, version_delta) in versioning
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
{
for group_deltas in version_delta.group_deltas.values() {
object_sizes.extend(
group_deltas
.group_deltas
.iter()
.flat_map(|delta| {
match delta {
GroupDeltaCommon::IntraLevel(level_delta) => {
Some(level_delta.inserted_table_infos.iter())
}
GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
Some(inserted_table_infos.iter())
}
GroupDeltaCommon::GroupConstruct(_)
| GroupDeltaCommon::GroupDestroy(_)
| GroupDeltaCommon::GroupMerge(_) => None,
}
.into_iter()
.flatten()
.map(|t| (t.object_id, t.file_size))
})
.chain(
version_delta
.change_log_delta
.values()
.flat_map(|change_log| {
let new_log = change_log.new_log.as_ref().unwrap();
new_log
.new_value
.iter()
.chain(new_log.old_value.iter())
.map(|t| (t.object_id, t.file_size))
}),
),
);
}
versions_object_ids.extend(version_delta.newly_added_object_ids());
}
let removed_object_ids = &versions_object_ids - ¤t_version.get_object_ids();
let total_file_size = removed_object_ids
.iter()
.map(|t| {
object_sizes.get(t).copied().unwrap_or_else(|| {
warn!(object_id = t, "unable to get size of removed object id");
0
})
})
.sum::<u64>();
stale_objects.insert(
current_version.id,
StaleObjects {
id: removed_object_ids.into_iter().collect(),
total_file_size,
},
);
if self.env.opts.enable_hummock_data_archive {
archive = Some(PbHummockVersionArchive {
version: Some(PbHummockVersion::from(&old_checkpoint.version)),
version_deltas: versioning
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
.map(|(_, version_delta)| version_delta.into())
.collect(),
});
}
let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
let may_delete_object = stale_objects
.iter()
.filter_map(|(version_id, object_ids)| {
if *version_id >= min_pinned_version_id {
return None;
}
Some(object_ids.id.clone())
})
.flatten();
self.gc_manager.add_may_delete_object_ids(may_delete_object);
stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
let new_checkpoint = HummockVersionCheckpoint {
version: current_version.clone(),
stale_objects,
};
drop(versioning_guard);
self.write_checkpoint(&new_checkpoint).await?;
if let Some(archive) = archive {
if let Err(e) = self.write_version_archive(&archive).await {
tracing::warn!(
error = %e.as_report(),
"failed to write version archive {}",
archive.version.as_ref().unwrap().id
);
}
}
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
versioning.checkpoint = new_checkpoint;
let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
trigger_split_stat(&self.metrics, &versioning.current_version);
drop(versioning_guard);
timer.observe_duration();
self.metrics
.checkpoint_version_id
.set(new_checkpoint_id.to_u64() as i64);
Ok(new_checkpoint_id - old_checkpoint_id)
}
pub fn pause_version_checkpoint(&self) {
self.pause_version_checkpoint.store(true, Ordering::Relaxed);
tracing::info!("hummock version checkpoint is paused.");
}
pub fn resume_version_checkpoint(&self) {
self.pause_version_checkpoint
.store(false, Ordering::Relaxed);
tracing::info!("hummock version checkpoint is resumed.");
}
pub fn is_version_checkpoint_paused(&self) -> bool {
self.pause_version_checkpoint.load(Ordering::Relaxed)
}
pub async fn get_checkpoint_version(&self) -> HummockVersion {
let versioning_guard = self.versioning.read().await;
versioning_guard.checkpoint.version.clone()
}
}