risingwave_storage/hummock/
hummock_meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use futures::stream::BoxStream;
20use risingwave_hummock_sdk::change_log::TableChangeLogs;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_hummock_sdk::{CompactionGroupId, ObjectIdRange, SyncResult};
23use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest};
24use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
25use risingwave_pb::id::{HummockSstableId, JobId, TableId};
26use risingwave_rpc_client::error::Result;
27use risingwave_rpc_client::{
28    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
29    IcebergCompactionEventItem, MetaClient,
30};
31use tokio::sync::mpsc::UnboundedSender;
32
33use crate::hummock::{HummockEpoch, HummockVersionId};
34use crate::monitor::HummockMetrics;
35
36pub struct MonitoredHummockMetaClient {
37    meta_client: MetaClient,
38
39    stats: Arc<HummockMetrics>,
40}
41
42impl MonitoredHummockMetaClient {
43    pub fn new(meta_client: MetaClient, stats: Arc<HummockMetrics>) -> MonitoredHummockMetaClient {
44        MonitoredHummockMetaClient { meta_client, stats }
45    }
46
47    pub fn get_inner(&self) -> &MetaClient {
48        &self.meta_client
49    }
50}
51
52#[async_trait]
53impl HummockMetaClient for MonitoredHummockMetaClient {
54    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
55        self.meta_client
56            .unpin_version_before(unpin_version_before)
57            .await
58    }
59
60    async fn get_current_version(&self) -> Result<HummockVersion> {
61        self.meta_client.get_current_version().await
62    }
63
64    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
65        self.stats.get_new_sst_ids_counts.inc();
66        let timer = self.stats.get_new_sst_ids_latency.start_timer();
67        let res = self.meta_client.get_new_object_ids(number).await;
68        timer.observe_duration();
69        res
70    }
71
72    async fn commit_epoch_with_change_log(
73        &self,
74        _epoch: HummockEpoch,
75        _sync_result: SyncResult,
76        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
77    ) -> Result<()> {
78        panic!("Only meta service can commit_epoch in production.")
79    }
80
81    async fn trigger_manual_compaction(
82        &self,
83        compaction_group_id: CompactionGroupId,
84        table_id: JobId,
85        level: u32,
86        target_level: Option<u32>,
87        sst_ids: Vec<HummockSstableId>,
88        exclusive: bool,
89    ) -> Result<bool> {
90        self.meta_client
91            .trigger_manual_compaction(
92                compaction_group_id,
93                table_id,
94                level,
95                target_level,
96                sst_ids,
97                exclusive,
98            )
99            .await
100    }
101
102    async fn trigger_full_gc(
103        &self,
104        sst_retention_time_sec: u64,
105        prefix: Option<String>,
106    ) -> Result<()> {
107        self.meta_client
108            .trigger_full_gc(sst_retention_time_sec, prefix)
109            .await
110    }
111
112    async fn subscribe_compaction_event(
113        &self,
114    ) -> Result<(
115        UnboundedSender<SubscribeCompactionEventRequest>,
116        BoxStream<'static, CompactionEventItem>,
117    )> {
118        self.meta_client.subscribe_compaction_event().await
119    }
120
121    async fn get_version_by_epoch(
122        &self,
123        epoch: HummockEpoch,
124        table_id: TableId,
125    ) -> Result<PbHummockVersion> {
126        self.meta_client.get_version_by_epoch(epoch, table_id).await
127    }
128
129    async fn subscribe_iceberg_compaction_event(
130        &self,
131    ) -> Result<(
132        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
133        BoxStream<'static, IcebergCompactionEventItem>,
134    )> {
135        self.meta_client.subscribe_iceberg_compaction_event().await
136    }
137
138    async fn get_table_change_logs(
139        &self,
140        epoch_only: bool,
141        start_epoch_inclusive: Option<u64>,
142        end_epoch_inclusive: Option<u64>,
143        table_ids: Option<HashSet<TableId>>,
144        exclude_empty: bool,
145        limit: Option<u32>,
146    ) -> Result<TableChangeLogs> {
147        self.meta_client
148            .get_table_change_logs(
149                epoch_only,
150                start_epoch_inclusive,
151                end_epoch_inclusive,
152                table_ids,
153                exclude_empty,
154                limit,
155            )
156            .await
157    }
158}