risingwave_storage/hummock/
hummock_meta_client.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use futures::stream::BoxStream;
20use risingwave_hummock_sdk::change_log::TableChangeLogs;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_hummock_sdk::{CompactionGroupId, ObjectIdRange, SyncResult};
23use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest};
24use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
25use risingwave_pb::id::{HummockSstableId, JobId, TableId};
26use risingwave_rpc_client::error::Result;
27use risingwave_rpc_client::{
28 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
29 IcebergCompactionEventItem, MetaClient,
30};
31use tokio::sync::mpsc::UnboundedSender;
32
33use crate::hummock::{HummockEpoch, HummockVersionId};
34use crate::monitor::HummockMetrics;
35
36pub struct MonitoredHummockMetaClient {
37 meta_client: MetaClient,
38
39 stats: Arc<HummockMetrics>,
40}
41
42impl MonitoredHummockMetaClient {
43 pub fn new(meta_client: MetaClient, stats: Arc<HummockMetrics>) -> MonitoredHummockMetaClient {
44 MonitoredHummockMetaClient { meta_client, stats }
45 }
46
47 pub fn get_inner(&self) -> &MetaClient {
48 &self.meta_client
49 }
50}
51
52#[async_trait]
53impl HummockMetaClient for MonitoredHummockMetaClient {
54 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
55 self.meta_client
56 .unpin_version_before(unpin_version_before)
57 .await
58 }
59
60 async fn get_current_version(&self) -> Result<HummockVersion> {
61 self.meta_client.get_current_version().await
62 }
63
64 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
65 self.stats.get_new_sst_ids_counts.inc();
66 let timer = self.stats.get_new_sst_ids_latency.start_timer();
67 let res = self.meta_client.get_new_object_ids(number).await;
68 timer.observe_duration();
69 res
70 }
71
72 async fn commit_epoch_with_change_log(
73 &self,
74 _epoch: HummockEpoch,
75 _sync_result: SyncResult,
76 _change_log_info: Option<HummockMetaClientChangeLogInfo>,
77 ) -> Result<()> {
78 panic!("Only meta service can commit_epoch in production.")
79 }
80
81 async fn trigger_manual_compaction(
82 &self,
83 compaction_group_id: CompactionGroupId,
84 table_id: JobId,
85 level: u32,
86 sst_ids: Vec<HummockSstableId>,
87 exclusive: bool,
88 ) -> Result<bool> {
89 self.meta_client
90 .trigger_manual_compaction(compaction_group_id, table_id, level, sst_ids, exclusive)
91 .await
92 }
93
94 async fn trigger_full_gc(
95 &self,
96 sst_retention_time_sec: u64,
97 prefix: Option<String>,
98 ) -> Result<()> {
99 self.meta_client
100 .trigger_full_gc(sst_retention_time_sec, prefix)
101 .await
102 }
103
104 async fn subscribe_compaction_event(
105 &self,
106 ) -> Result<(
107 UnboundedSender<SubscribeCompactionEventRequest>,
108 BoxStream<'static, CompactionEventItem>,
109 )> {
110 self.meta_client.subscribe_compaction_event().await
111 }
112
113 async fn get_version_by_epoch(
114 &self,
115 epoch: HummockEpoch,
116 table_id: TableId,
117 ) -> Result<PbHummockVersion> {
118 self.meta_client.get_version_by_epoch(epoch, table_id).await
119 }
120
121 async fn subscribe_iceberg_compaction_event(
122 &self,
123 ) -> Result<(
124 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
125 BoxStream<'static, IcebergCompactionEventItem>,
126 )> {
127 self.meta_client.subscribe_iceberg_compaction_event().await
128 }
129
130 async fn get_table_change_logs(
131 &self,
132 epoch_only: bool,
133 start_epoch_inclusive: Option<u64>,
134 end_epoch_inclusive: Option<u64>,
135 table_ids: Option<HashSet<TableId>>,
136 exclude_empty: bool,
137 limit: Option<u32>,
138 ) -> Result<TableChangeLogs> {
139 self.meta_client
140 .get_table_change_logs(
141 epoch_only,
142 start_epoch_inclusive,
143 end_epoch_inclusive,
144 table_ids,
145 exclude_empty,
146 limit,
147 )
148 .await
149 }
150}