risingwave_meta/hummock/
mock_hummock_meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::time::SystemTime;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use fail::fail_point;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
28use risingwave_hummock_sdk::compact_task::CompactTask;
29use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::version::HummockVersion;
32use risingwave_hummock_sdk::{
33    HummockContextId, HummockEpoch, HummockVersionId, LocalSstableInfo, SstObjectIdRange,
34    SyncResult,
35};
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
38use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask};
39use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
40use risingwave_pb::hummock::{
41    PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
42    compact_task,
43};
44use risingwave_rpc_client::error::{Result, RpcError};
45use risingwave_rpc_client::{
46    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
47};
48use thiserror_ext::AsReport;
49use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
50use tokio::task::JoinHandle;
51use tokio_stream::wrappers::UnboundedReceiverStream;
52
53use crate::hummock::compaction::selector::{
54    CompactionSelector, SpaceReclaimCompactionSelector, default_compaction_selector,
55};
56use crate::hummock::error::Error;
57use crate::hummock::{CommitEpochInfo, HummockManager, NewTableFragmentInfo};
58
59pub struct MockHummockMetaClient {
60    hummock_manager: Arc<HummockManager>,
61    context_id: HummockContextId,
62    compact_context_id: AtomicU32,
63    // used for hummock replay to avoid collision with existing sst files
64    sst_offset: u64,
65}
66
67impl MockHummockMetaClient {
68    pub fn new(
69        hummock_manager: Arc<HummockManager>,
70        context_id: HummockContextId,
71    ) -> MockHummockMetaClient {
72        MockHummockMetaClient {
73            hummock_manager,
74            context_id,
75            compact_context_id: AtomicU32::new(context_id),
76            sst_offset: 0,
77        }
78    }
79
80    pub fn with_sst_offset(
81        hummock_manager: Arc<HummockManager>,
82        context_id: HummockContextId,
83        sst_offset: u64,
84    ) -> Self {
85        Self {
86            hummock_manager,
87            context_id,
88            compact_context_id: AtomicU32::new(context_id),
89            sst_offset,
90        }
91    }
92
93    pub async fn get_compact_task(&self) -> Option<CompactTask> {
94        self.hummock_manager
95            .get_compact_task(
96                StaticCompactionGroupId::StateDefault.into(),
97                &mut default_compaction_selector(),
98            )
99            .await
100            .unwrap_or(None)
101    }
102
103    pub fn context_id(&self) -> HummockContextId {
104        self.context_id
105    }
106}
107
108fn mock_err(error: super::error::Error) -> RpcError {
109    anyhow!(error).context("mock error").into()
110}
111
112#[async_trait]
113impl HummockMetaClient for MockHummockMetaClient {
114    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
115        self.hummock_manager
116            .unpin_version_before(self.context_id, unpin_version_before)
117            .await
118            .map_err(mock_err)
119    }
120
121    async fn get_current_version(&self) -> Result<HummockVersion> {
122        Ok(self.hummock_manager.get_current_version().await)
123    }
124
125    async fn get_new_sst_ids(&self, number: u32) -> Result<SstObjectIdRange> {
126        fail_point!("get_new_sst_ids_err", |_| Err(anyhow!(
127            "failpoint get_new_sst_ids_err"
128        )
129        .into()));
130        self.hummock_manager
131            .get_new_sst_ids(number)
132            .await
133            .map_err(mock_err)
134            .map(|range| SstObjectIdRange {
135                start_id: range.start_id + self.sst_offset,
136                end_id: range.end_id + self.sst_offset,
137            })
138    }
139
140    async fn commit_epoch_with_change_log(
141        &self,
142        epoch: HummockEpoch,
143        sync_result: SyncResult,
144        change_log_info: Option<HummockMetaClientChangeLogInfo>,
145    ) -> Result<()> {
146        let version: HummockVersion = self.hummock_manager.get_current_version().await;
147        let table_ids = version
148            .state_table_info
149            .info()
150            .keys()
151            .map(|table_id| table_id.table_id)
152            .collect::<BTreeSet<_>>();
153
154        let commit_table_ids = sync_result
155            .uncommitted_ssts
156            .iter()
157            .flat_map(|sstable| sstable.sst_info.table_ids.clone())
158            .chain({
159                sync_result
160                    .old_value_ssts
161                    .iter()
162                    .flat_map(|sstable| sstable.sst_info.table_ids.clone())
163            })
164            .chain(
165                sync_result
166                    .table_watermarks
167                    .keys()
168                    .map(|table_id| table_id.table_id),
169            )
170            .chain(table_ids.iter().cloned())
171            .collect::<BTreeSet<_>>();
172
173        let new_table_fragment_infos = if commit_table_ids
174            .iter()
175            .all(|table_id| table_ids.contains(table_id))
176        {
177            vec![]
178        } else {
179            vec![NewTableFragmentInfo {
180                table_ids: commit_table_ids
181                    .iter()
182                    .cloned()
183                    .map(TableId::from)
184                    .collect(),
185            }]
186        };
187
188        let sst_to_context = sync_result
189            .uncommitted_ssts
190            .iter()
191            .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, self.context_id))
192            .collect();
193        let new_table_watermark = sync_result.table_watermarks;
194        let table_change_log = match change_log_info {
195            Some(epochs) => {
196                assert_eq!(*epochs.last().expect("non-empty"), epoch);
197                build_table_change_log_delta(
198                    sync_result
199                        .old_value_ssts
200                        .into_iter()
201                        .map(|sst| sst.sst_info),
202                    sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info),
203                    &epochs,
204                    commit_table_ids.iter().map(|&table_id| (table_id, 0)),
205                )
206            }
207            None => Default::default(),
208        };
209
210        self.hummock_manager
211            .commit_epoch(CommitEpochInfo {
212                sstables: sync_result.uncommitted_ssts,
213                new_table_watermarks: new_table_watermark,
214                sst_to_context,
215                new_table_fragment_infos,
216                change_log_delta: table_change_log,
217                tables_to_commit: commit_table_ids
218                    .iter()
219                    .cloned()
220                    .map(|table_id| (TableId::new(table_id), epoch))
221                    .collect(),
222            })
223            .await
224            .map_err(mock_err)?;
225        Ok(())
226    }
227
228    async fn trigger_manual_compaction(
229        &self,
230        _compaction_group_id: u64,
231        _table_id: u32,
232        _level: u32,
233        _sst_ids: Vec<u64>,
234    ) -> Result<()> {
235        todo!()
236    }
237
238    async fn trigger_full_gc(
239        &self,
240        _sst_retention_time_sec: u64,
241        _prefix: Option<String>,
242    ) -> Result<()> {
243        unimplemented!()
244    }
245
246    async fn subscribe_compaction_event(
247        &self,
248    ) -> Result<(
249        UnboundedSender<SubscribeCompactionEventRequest>,
250        BoxStream<'static, CompactionEventItem>,
251    )> {
252        let context_id = self
253            .hummock_manager
254            .metadata_manager()
255            .add_worker_node(
256                WorkerType::Compactor,
257                HostAddress {
258                    host: "compactor".to_owned(),
259                    port: 0,
260                },
261                Default::default(),
262                Default::default(),
263            )
264            .await
265            .unwrap();
266        let _compactor_rx = self
267            .hummock_manager
268            .compactor_manager_ref_for_test()
269            .add_compactor(context_id as _);
270
271        let (request_sender, mut request_receiver) =
272            unbounded_channel::<SubscribeCompactionEventRequest>();
273
274        self.compact_context_id
275            .store(context_id as _, Ordering::Release);
276
277        let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel();
278
279        let hummock_manager_compact = self.hummock_manager.clone();
280        let mut join_handle_vec = vec![];
281
282        let handle = tokio::spawn(async move {
283            loop {
284                let group_and_type = hummock_manager_compact
285                    .auto_pick_compaction_group_and_type()
286                    .await;
287
288                if group_and_type.is_none() {
289                    break;
290                }
291
292                let (group, task_type) = group_and_type.unwrap();
293
294                if let TaskType::Ttl = task_type {
295                    match hummock_manager_compact
296                        .metadata_manager_ref()
297                        .get_all_table_options()
298                        .await
299                        .map_err(|err| Error::MetaStore(err.into()))
300                    {
301                        Ok(table_options) => {
302                            hummock_manager_compact.update_table_id_to_table_option(table_options);
303                        }
304                        Err(e) => {
305                            tracing::error!(error = %e.as_report(), "get_all_table_options fail");
306                        }
307                    }
308                }
309
310                let mut selector: Box<dyn CompactionSelector> = match task_type {
311                    compact_task::TaskType::Dynamic => default_compaction_selector(),
312                    compact_task::TaskType::SpaceReclaim => {
313                        Box::<SpaceReclaimCompactionSelector>::default()
314                    }
315
316                    _ => panic!("Error type when mock_hummock_meta_client subscribe_compact_tasks"),
317                };
318                if let Some(task) = hummock_manager_compact
319                    .get_compact_task(group, &mut selector)
320                    .await
321                    .unwrap()
322                {
323                    let resp = SubscribeCompactionEventResponse {
324                        event: Some(ResponseEvent::CompactTask(task.into())),
325                        create_at: SystemTime::now()
326                            .duration_since(std::time::UNIX_EPOCH)
327                            .expect("Clock may have gone backwards")
328                            .as_millis() as u64,
329                    };
330
331                    let _ = task_tx.send(Ok(resp));
332                }
333            }
334        });
335
336        join_handle_vec.push(handle);
337
338        let hummock_manager_compact = self.hummock_manager.clone();
339        let report_handle = tokio::spawn(async move {
340            tracing::info!("report_handle start");
341            loop {
342                if let Some(item) = request_receiver.recv().await {
343                    if let Event::ReportTask(ReportTask {
344                        task_id,
345                        task_status,
346                        sorted_output_ssts,
347                        table_stats_change,
348                        object_timestamps,
349                    }) = item.event.unwrap()
350                    {
351                        if let Err(e) = hummock_manager_compact
352                            .report_compact_task(
353                                task_id,
354                                TaskStatus::try_from(task_status).unwrap(),
355                                sorted_output_ssts
356                                    .into_iter()
357                                    .map(SstableInfo::from)
358                                    .collect_vec(),
359                                Some(table_stats_change),
360                                object_timestamps,
361                            )
362                            .await
363                        {
364                            tracing::error!(error = %e.as_report(), "report compact_tack fail");
365                        }
366                    }
367                }
368            }
369        });
370
371        join_handle_vec.push(report_handle);
372
373        Ok((
374            request_sender,
375            Box::pin(CompactionEventItemStream {
376                inner: UnboundedReceiverStream::new(task_rx),
377                _handle: join_handle_vec,
378            }),
379        ))
380    }
381
382    async fn get_version_by_epoch(
383        &self,
384        _epoch: HummockEpoch,
385        _table_id: u32,
386    ) -> Result<PbHummockVersion> {
387        unimplemented!()
388    }
389}
390
391impl MockHummockMetaClient {
392    pub fn hummock_manager_ref(&self) -> Arc<HummockManager> {
393        self.hummock_manager.clone()
394    }
395}
396
397pub struct CompactionEventItemStream {
398    inner: UnboundedReceiverStream<CompactionEventItem>,
399    _handle: Vec<JoinHandle<()>>,
400}
401
402impl Drop for CompactionEventItemStream {
403    fn drop(&mut self) {
404        self.inner.close();
405    }
406}
407
408impl Stream for CompactionEventItemStream {
409    type Item = CompactionEventItem;
410
411    fn poll_next(
412        mut self: std::pin::Pin<&mut Self>,
413        cx: &mut std::task::Context<'_>,
414    ) -> std::task::Poll<Option<Self::Item>> {
415        self.inner.poll_next_unpin(cx)
416    }
417}