risingwave_storage/hummock/
hummock_meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use futures::stream::BoxStream;
20use risingwave_hummock_sdk::change_log::TableChangeLogs;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_hummock_sdk::{CompactionGroupId, ObjectIdRange, SyncResult};
23use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest};
24use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
25use risingwave_pb::id::{HummockSstableId, JobId, TableId};
26use risingwave_rpc_client::error::Result;
27use risingwave_rpc_client::{
28    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
29    IcebergCompactionEventItem, MetaClient,
30};
31use tokio::sync::mpsc::UnboundedSender;
32
33use crate::hummock::{HummockEpoch, HummockVersionId};
34use crate::monitor::HummockMetrics;
35
36pub struct MonitoredHummockMetaClient {
37    meta_client: MetaClient,
38
39    stats: Arc<HummockMetrics>,
40}
41
42impl MonitoredHummockMetaClient {
43    pub fn new(meta_client: MetaClient, stats: Arc<HummockMetrics>) -> MonitoredHummockMetaClient {
44        MonitoredHummockMetaClient { meta_client, stats }
45    }
46
47    pub fn get_inner(&self) -> &MetaClient {
48        &self.meta_client
49    }
50}
51
52#[async_trait]
53impl HummockMetaClient for MonitoredHummockMetaClient {
54    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
55        self.meta_client
56            .unpin_version_before(unpin_version_before)
57            .await
58    }
59
60    async fn get_current_version(&self) -> Result<HummockVersion> {
61        self.meta_client.get_current_version().await
62    }
63
64    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
65        self.stats.get_new_sst_ids_counts.inc();
66        let timer = self.stats.get_new_sst_ids_latency.start_timer();
67        let res = self.meta_client.get_new_object_ids(number).await;
68        timer.observe_duration();
69        res
70    }
71
72    async fn commit_epoch_with_change_log(
73        &self,
74        _epoch: HummockEpoch,
75        _sync_result: SyncResult,
76        _change_log_info: Option<HummockMetaClientChangeLogInfo>,
77    ) -> Result<()> {
78        panic!("Only meta service can commit_epoch in production.")
79    }
80
81    async fn trigger_manual_compaction(
82        &self,
83        compaction_group_id: CompactionGroupId,
84        table_id: JobId,
85        level: u32,
86        sst_ids: Vec<HummockSstableId>,
87        exclusive: bool,
88    ) -> Result<bool> {
89        self.meta_client
90            .trigger_manual_compaction(compaction_group_id, table_id, level, sst_ids, exclusive)
91            .await
92    }
93
94    async fn trigger_full_gc(
95        &self,
96        sst_retention_time_sec: u64,
97        prefix: Option<String>,
98    ) -> Result<()> {
99        self.meta_client
100            .trigger_full_gc(sst_retention_time_sec, prefix)
101            .await
102    }
103
104    async fn subscribe_compaction_event(
105        &self,
106    ) -> Result<(
107        UnboundedSender<SubscribeCompactionEventRequest>,
108        BoxStream<'static, CompactionEventItem>,
109    )> {
110        self.meta_client.subscribe_compaction_event().await
111    }
112
113    async fn get_version_by_epoch(
114        &self,
115        epoch: HummockEpoch,
116        table_id: TableId,
117    ) -> Result<PbHummockVersion> {
118        self.meta_client.get_version_by_epoch(epoch, table_id).await
119    }
120
121    async fn subscribe_iceberg_compaction_event(
122        &self,
123    ) -> Result<(
124        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
125        BoxStream<'static, IcebergCompactionEventItem>,
126    )> {
127        self.meta_client.subscribe_iceberg_compaction_event().await
128    }
129
130    async fn get_table_change_logs(
131        &self,
132        epoch_only: bool,
133        start_epoch_inclusive: Option<u64>,
134        end_epoch_inclusive: Option<u64>,
135        table_ids: Option<HashSet<TableId>>,
136        exclude_empty: bool,
137        limit: Option<u32>,
138    ) -> Result<TableChangeLogs> {
139        self.meta_client
140            .get_table_change_logs(
141                epoch_only,
142                start_epoch_inclusive,
143                end_epoch_inclusive,
144                table_ids,
145                exclude_empty,
146                limit,
147            )
148            .await
149    }
150}