risingwave_storage/hummock/
hummock_meta_client.rsuse std::sync::Arc;
use async_trait::async_trait;
use futures::stream::BoxStream;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{SstObjectIdRange, SyncResult};
use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest};
use risingwave_rpc_client::error::Result;
use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient, MetaClient};
use tokio::sync::mpsc::UnboundedSender;
use crate::hummock::{HummockEpoch, HummockVersionId};
use crate::monitor::HummockMetrics;
pub struct MonitoredHummockMetaClient {
meta_client: MetaClient,
stats: Arc<HummockMetrics>,
}
impl MonitoredHummockMetaClient {
pub fn new(meta_client: MetaClient, stats: Arc<HummockMetrics>) -> MonitoredHummockMetaClient {
MonitoredHummockMetaClient { meta_client, stats }
}
pub fn get_inner(&self) -> &MetaClient {
&self.meta_client
}
}
#[async_trait]
impl HummockMetaClient for MonitoredHummockMetaClient {
async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
self.meta_client
.unpin_version_before(unpin_version_before)
.await
}
async fn get_current_version(&self) -> Result<HummockVersion> {
self.meta_client.get_current_version().await
}
async fn get_new_sst_ids(&self, number: u32) -> Result<SstObjectIdRange> {
self.stats.get_new_sst_ids_counts.inc();
let timer = self.stats.get_new_sst_ids_latency.start_timer();
let res = self.meta_client.get_new_sst_ids(number).await;
timer.observe_duration();
res
}
async fn commit_epoch(
&self,
_epoch: HummockEpoch,
_sync_result: SyncResult,
_is_log_store: bool,
) -> Result<()> {
panic!("Only meta service can commit_epoch in production.")
}
async fn trigger_manual_compaction(
&self,
compaction_group_id: u64,
table_id: u32,
level: u32,
sst_ids: Vec<u64>,
) -> Result<()> {
self.meta_client
.trigger_manual_compaction(compaction_group_id, table_id, level, sst_ids)
.await
}
async fn trigger_full_gc(
&self,
sst_retention_time_sec: u64,
prefix: Option<String>,
) -> Result<()> {
self.meta_client
.trigger_full_gc(sst_retention_time_sec, prefix)
.await
}
async fn subscribe_compaction_event(
&self,
) -> Result<(
UnboundedSender<SubscribeCompactionEventRequest>,
BoxStream<'static, CompactionEventItem>,
)> {
self.meta_client.subscribe_compaction_event().await
}
async fn get_version_by_epoch(
&self,
epoch: HummockEpoch,
table_id: u32,
) -> Result<PbHummockVersion> {
self.meta_client.get_version_by_epoch(epoch, table_id).await
}
}