risingwave_meta/hummock/
mock_hummock_meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::time::SystemTime;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use fail::fail_point;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
28use risingwave_hummock_sdk::compact_task::CompactTask;
29use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::version::HummockVersion;
32use risingwave_hummock_sdk::{
33    HummockContextId, HummockEpoch, HummockVersionId, LocalSstableInfo, ObjectIdRange, SyncResult,
34};
35use risingwave_pb::common::{HostAddress, WorkerType};
36use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
37use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask};
38use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
39use risingwave_pb::hummock::{
40    PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
41    compact_task,
42};
43use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
44use risingwave_rpc_client::error::{Result, RpcError};
45use risingwave_rpc_client::{
46    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
47    IcebergCompactionEventItem,
48};
49use thiserror_ext::AsReport;
50use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
51use tokio::task::JoinHandle;
52use tokio_stream::wrappers::UnboundedReceiverStream;
53
54use crate::hummock::compaction::selector::{
55    CompactionSelector, SpaceReclaimCompactionSelector, default_compaction_selector,
56};
57use crate::hummock::error::Error;
58use crate::hummock::{CommitEpochInfo, HummockManager, NewTableFragmentInfo};
59
60pub struct MockHummockMetaClient {
61    hummock_manager: Arc<HummockManager>,
62    context_id: HummockContextId,
63    compact_context_id: AtomicU32,
64    // used for hummock replay to avoid collision with existing sst files
65    sst_offset: u64,
66}
67
68impl MockHummockMetaClient {
69    pub fn new(
70        hummock_manager: Arc<HummockManager>,
71        context_id: HummockContextId,
72    ) -> MockHummockMetaClient {
73        MockHummockMetaClient {
74            hummock_manager,
75            context_id,
76            compact_context_id: AtomicU32::new(context_id),
77            sst_offset: 0,
78        }
79    }
80
81    pub fn with_sst_offset(
82        hummock_manager: Arc<HummockManager>,
83        context_id: HummockContextId,
84        sst_offset: u64,
85    ) -> Self {
86        Self {
87            hummock_manager,
88            context_id,
89            compact_context_id: AtomicU32::new(context_id),
90            sst_offset,
91        }
92    }
93
94    pub async fn get_compact_task(&self) -> Option<CompactTask> {
95        self.hummock_manager
96            .get_compact_task(
97                StaticCompactionGroupId::StateDefault.into(),
98                &mut default_compaction_selector(),
99            )
100            .await
101            .unwrap_or(None)
102    }
103
104    pub fn context_id(&self) -> HummockContextId {
105        self.context_id
106    }
107}
108
109fn mock_err(error: super::error::Error) -> RpcError {
110    anyhow!(error).context("mock error").into()
111}
112
113#[async_trait]
114impl HummockMetaClient for MockHummockMetaClient {
115    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
116        self.hummock_manager
117            .unpin_version_before(self.context_id, unpin_version_before)
118            .await
119            .map_err(mock_err)
120    }
121
122    async fn get_current_version(&self) -> Result<HummockVersion> {
123        Ok(self.hummock_manager.get_current_version().await)
124    }
125
126    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
127        fail_point!("get_new_sst_ids_err", |_| Err(anyhow!(
128            "failpoint get_new_sst_ids_err"
129        )
130        .into()));
131        self.hummock_manager
132            .get_new_object_ids(number)
133            .await
134            .map_err(mock_err)
135            .map(|range| ObjectIdRange {
136                start_id: range.start_id + self.sst_offset,
137                end_id: range.end_id + self.sst_offset,
138            })
139    }
140
141    async fn commit_epoch_with_change_log(
142        &self,
143        epoch: HummockEpoch,
144        sync_result: SyncResult,
145        change_log_info: Option<HummockMetaClientChangeLogInfo>,
146    ) -> Result<()> {
147        let version: HummockVersion = self.hummock_manager.get_current_version().await;
148        let table_ids = version
149            .state_table_info
150            .info()
151            .keys()
152            .map(|table_id| table_id.table_id)
153            .collect::<BTreeSet<_>>();
154
155        let commit_table_ids = sync_result
156            .uncommitted_ssts
157            .iter()
158            .flat_map(|sstable| sstable.sst_info.table_ids.clone())
159            .chain({
160                sync_result
161                    .old_value_ssts
162                    .iter()
163                    .flat_map(|sstable| sstable.sst_info.table_ids.clone())
164            })
165            .chain(
166                sync_result
167                    .table_watermarks
168                    .keys()
169                    .map(|table_id| table_id.table_id),
170            )
171            .chain(table_ids.iter().cloned())
172            .collect::<BTreeSet<_>>();
173
174        let new_table_fragment_infos = if commit_table_ids
175            .iter()
176            .all(|table_id| table_ids.contains(table_id))
177        {
178            vec![]
179        } else {
180            vec![NewTableFragmentInfo {
181                table_ids: commit_table_ids
182                    .iter()
183                    .cloned()
184                    .map(TableId::from)
185                    .collect(),
186            }]
187        };
188
189        let sst_to_context = sync_result
190            .uncommitted_ssts
191            .iter()
192            .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, self.context_id))
193            .collect();
194        let new_table_watermark = sync_result.table_watermarks;
195        let table_change_log = match change_log_info {
196            Some(epochs) => {
197                assert_eq!(*epochs.last().expect("non-empty"), epoch);
198                build_table_change_log_delta(
199                    sync_result
200                        .old_value_ssts
201                        .into_iter()
202                        .map(|sst| sst.sst_info),
203                    sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info),
204                    &epochs,
205                    commit_table_ids.iter().map(|&table_id| (table_id, 0)),
206                )
207            }
208            None => Default::default(),
209        };
210
211        self.hummock_manager
212            .commit_epoch(CommitEpochInfo {
213                sstables: sync_result.uncommitted_ssts,
214                new_table_watermarks: new_table_watermark,
215                sst_to_context,
216                new_table_fragment_infos,
217                change_log_delta: table_change_log,
218                vector_index_delta: Default::default(),
219                tables_to_commit: commit_table_ids
220                    .iter()
221                    .cloned()
222                    .map(|table_id| (TableId::new(table_id), epoch))
223                    .collect(),
224            })
225            .await
226            .map_err(mock_err)?;
227        Ok(())
228    }
229
230    async fn trigger_manual_compaction(
231        &self,
232        _compaction_group_id: u64,
233        _table_id: u32,
234        _level: u32,
235        _sst_ids: Vec<u64>,
236    ) -> Result<()> {
237        todo!()
238    }
239
240    async fn trigger_full_gc(
241        &self,
242        _sst_retention_time_sec: u64,
243        _prefix: Option<String>,
244    ) -> Result<()> {
245        unimplemented!()
246    }
247
248    async fn subscribe_compaction_event(
249        &self,
250    ) -> Result<(
251        UnboundedSender<SubscribeCompactionEventRequest>,
252        BoxStream<'static, CompactionEventItem>,
253    )> {
254        let context_id = self
255            .hummock_manager
256            .metadata_manager()
257            .add_worker_node(
258                WorkerType::Compactor,
259                HostAddress {
260                    host: "compactor".to_owned(),
261                    port: 0,
262                },
263                Default::default(),
264                Default::default(),
265            )
266            .await
267            .unwrap();
268        let _compactor_rx = self
269            .hummock_manager
270            .compactor_manager
271            .clone()
272            .add_compactor(context_id as _);
273
274        let (request_sender, mut request_receiver) =
275            unbounded_channel::<SubscribeCompactionEventRequest>();
276
277        self.compact_context_id
278            .store(context_id as _, Ordering::Release);
279
280        let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel();
281
282        let hummock_manager_compact = self.hummock_manager.clone();
283        let mut join_handle_vec = vec![];
284
285        let handle = tokio::spawn(async move {
286            loop {
287                let group_and_type = hummock_manager_compact
288                    .auto_pick_compaction_group_and_type()
289                    .await;
290
291                if group_and_type.is_none() {
292                    break;
293                }
294
295                let (group, task_type) = group_and_type.unwrap();
296
297                if let TaskType::Ttl = task_type {
298                    match hummock_manager_compact
299                        .metadata_manager_ref()
300                        .get_all_table_options()
301                        .await
302                        .map_err(|err| Error::MetaStore(err.into()))
303                    {
304                        Ok(table_options) => {
305                            hummock_manager_compact.update_table_id_to_table_option(table_options);
306                        }
307                        Err(e) => {
308                            tracing::error!(error = %e.as_report(), "get_all_table_options fail");
309                        }
310                    }
311                }
312
313                let mut selector: Box<dyn CompactionSelector> = match task_type {
314                    compact_task::TaskType::Dynamic => default_compaction_selector(),
315                    compact_task::TaskType::SpaceReclaim => {
316                        Box::<SpaceReclaimCompactionSelector>::default()
317                    }
318
319                    _ => panic!("Error type when mock_hummock_meta_client subscribe_compact_tasks"),
320                };
321                if let Some(task) = hummock_manager_compact
322                    .get_compact_task(group, &mut selector)
323                    .await
324                    .unwrap()
325                {
326                    let resp = SubscribeCompactionEventResponse {
327                        event: Some(ResponseEvent::CompactTask(task.into())),
328                        create_at: SystemTime::now()
329                            .duration_since(std::time::UNIX_EPOCH)
330                            .expect("Clock may have gone backwards")
331                            .as_millis() as u64,
332                    };
333
334                    let _ = task_tx.send(Ok(resp));
335                }
336            }
337        });
338
339        join_handle_vec.push(handle);
340
341        let hummock_manager_compact = self.hummock_manager.clone();
342        let report_handle = tokio::spawn(async move {
343            tracing::info!("report_handle start");
344            loop {
345                if let Some(item) = request_receiver.recv().await {
346                    if let Event::ReportTask(ReportTask {
347                        task_id,
348                        task_status,
349                        sorted_output_ssts,
350                        table_stats_change,
351                        object_timestamps,
352                    }) = item.event.unwrap()
353                    {
354                        if let Err(e) = hummock_manager_compact
355                            .report_compact_task(
356                                task_id,
357                                TaskStatus::try_from(task_status).unwrap(),
358                                sorted_output_ssts
359                                    .into_iter()
360                                    .map(SstableInfo::from)
361                                    .collect_vec(),
362                                Some(table_stats_change),
363                                object_timestamps
364                                    .into_iter()
365                                    .map(|(id, ts)| (id.into(), ts))
366                                    .collect(),
367                            )
368                            .await
369                        {
370                            tracing::error!(error = %e.as_report(), "report compact_tack fail");
371                        }
372                    }
373                }
374            }
375        });
376
377        join_handle_vec.push(report_handle);
378
379        Ok((
380            request_sender,
381            Box::pin(CompactionEventItemStream {
382                inner: UnboundedReceiverStream::new(task_rx),
383                _handle: join_handle_vec,
384            }),
385        ))
386    }
387
388    async fn get_version_by_epoch(
389        &self,
390        _epoch: HummockEpoch,
391        _table_id: u32,
392    ) -> Result<PbHummockVersion> {
393        unimplemented!()
394    }
395
396    async fn subscribe_iceberg_compaction_event(
397        &self,
398    ) -> Result<(
399        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
400        BoxStream<'static, IcebergCompactionEventItem>,
401    )> {
402        unimplemented!()
403    }
404}
405
406impl MockHummockMetaClient {
407    pub fn hummock_manager_ref(&self) -> Arc<HummockManager> {
408        self.hummock_manager.clone()
409    }
410}
411
412pub struct CompactionEventItemStream {
413    inner: UnboundedReceiverStream<CompactionEventItem>,
414    _handle: Vec<JoinHandle<()>>,
415}
416
417impl Drop for CompactionEventItemStream {
418    fn drop(&mut self) {
419        self.inner.close();
420    }
421}
422
423impl Stream for CompactionEventItemStream {
424    type Item = CompactionEventItem;
425
426    fn poll_next(
427        mut self: std::pin::Pin<&mut Self>,
428        cx: &mut std::task::Context<'_>,
429    ) -> std::task::Poll<Option<Self::Item>> {
430        self.inner.poll_next_unpin(cx)
431    }
432}