risingwave_meta/hummock/manager/
utils.rsmacro_rules! commit_multi_var {
($meta_store:expr, $($val_txn:expr),*) => {
{
async {
use crate::model::{MetadataModelError, InMemValTransaction, ValTransaction};
use sea_orm::TransactionTrait;
let mut txn = $meta_store.conn.begin().await.map_err(MetadataModelError::from)?;
$(
$val_txn.apply_to_txn(&mut txn).await?;
)*
txn.commit().await.map_err(MetadataModelError::from)?;
$(
$val_txn.commit();
)*
Result::Ok(())
}.await
}
};
}
macro_rules! commit_multi_var_with_provided_txn {
($txn:expr, $($val_txn:expr),*) => {
{
async {
use crate::model::{InMemValTransaction, ValTransaction};
use crate::model::MetadataModelError;
$(
$val_txn.apply_to_txn(&mut $txn).await?;
)*
$txn.commit().await.map_err(MetadataModelError::from)?;
$(
$val_txn.commit();
)*
Result::Ok(())
}.await
}
};
}
use risingwave_hummock_sdk::SstObjectIdRange;
pub(crate) use {commit_multi_var, commit_multi_var_with_provided_txn};
use crate::hummock::error::Result;
use crate::hummock::sequence::next_sstable_object_id;
use crate::hummock::HummockManager;
impl HummockManager {
#[cfg(test)]
pub(super) async fn check_state_consistency(&self) {
use crate::hummock::manager::compaction::Compaction;
use crate::hummock::manager::context::ContextInfo;
use crate::hummock::manager::versioning::Versioning;
let mut compaction_guard = self.compaction.write().await;
let mut versioning_guard = self.versioning.write().await;
let mut context_info_guard = self.context_info.write().await;
let get_state = |compaction_guard: &mut Compaction,
versioning_guard: &mut Versioning,
context_info_guard: &mut ContextInfo| {
let compact_statuses_copy = compaction_guard.compaction_statuses.clone();
let compact_task_assignment_copy = compaction_guard.compact_task_assignment.clone();
let pinned_versions_copy = context_info_guard.pinned_versions.clone();
let hummock_version_deltas_copy = versioning_guard.hummock_version_deltas.clone();
let version_stats_copy = versioning_guard.version_stats.clone();
((
compact_statuses_copy,
compact_task_assignment_copy,
pinned_versions_copy,
hummock_version_deltas_copy,
version_stats_copy,
),)
};
let mem_state = get_state(
&mut compaction_guard,
&mut versioning_guard,
&mut context_info_guard,
);
self.load_meta_store_state_impl(
&mut compaction_guard,
&mut versioning_guard,
&mut context_info_guard,
)
.await
.expect("Failed to load state from meta store");
let loaded_state = get_state(
&mut compaction_guard,
&mut versioning_guard,
&mut context_info_guard,
);
assert_eq!(
mem_state, loaded_state,
"hummock in-mem state is inconsistent with meta store state",
);
}
pub async fn get_new_sst_ids(&self, number: u32) -> Result<SstObjectIdRange> {
let start_id = next_sstable_object_id(&self.env, number).await?;
Ok(SstObjectIdRange::new(start_id, start_id + number as u64))
}
}