risingwave_meta/hummock/
mock_hummock_meta_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::time::SystemTime;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use fail::fail_point;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
28use risingwave_hummock_sdk::compact_task::CompactTask;
29use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
32use risingwave_hummock_sdk::version::HummockVersion;
33use risingwave_hummock_sdk::{
34    HummockContextId, HummockEpoch, HummockVersionId, LocalSstableInfo, ObjectIdRange, SyncResult,
35};
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
38use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask};
39use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
40use risingwave_pb::hummock::{
41    PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
42    compact_task,
43};
44use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
45use risingwave_rpc_client::error::{Result, RpcError};
46use risingwave_rpc_client::{
47    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
48    IcebergCompactionEventItem,
49};
50use thiserror_ext::AsReport;
51use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
52use tokio::task::JoinHandle;
53use tokio_stream::wrappers::UnboundedReceiverStream;
54
55use crate::hummock::compaction::selector::{
56    CompactionSelector, SpaceReclaimCompactionSelector, default_compaction_selector,
57};
58use crate::hummock::error::Error;
59use crate::hummock::{CommitEpochInfo, HummockManager, NewTableFragmentInfo};
60
61pub struct MockHummockMetaClient {
62    hummock_manager: Arc<HummockManager>,
63    context_id: HummockContextId,
64    compact_context_id: AtomicU32,
65    // used for hummock replay to avoid collision with existing sst files
66    sst_offset: u64,
67}
68
69impl MockHummockMetaClient {
70    pub fn new(
71        hummock_manager: Arc<HummockManager>,
72        context_id: HummockContextId,
73    ) -> MockHummockMetaClient {
74        MockHummockMetaClient {
75            hummock_manager,
76            context_id,
77            compact_context_id: AtomicU32::new(context_id),
78            sst_offset: 0,
79        }
80    }
81
82    pub fn with_sst_offset(
83        hummock_manager: Arc<HummockManager>,
84        context_id: HummockContextId,
85        sst_offset: u64,
86    ) -> Self {
87        Self {
88            hummock_manager,
89            context_id,
90            compact_context_id: AtomicU32::new(context_id),
91            sst_offset,
92        }
93    }
94
95    pub async fn get_compact_task(&self) -> Option<CompactTask> {
96        self.hummock_manager
97            .get_compact_task(
98                StaticCompactionGroupId::StateDefault.into(),
99                &mut default_compaction_selector(),
100            )
101            .await
102            .unwrap_or(None)
103    }
104
105    pub fn context_id(&self) -> HummockContextId {
106        self.context_id
107    }
108}
109
110fn mock_err(error: super::error::Error) -> RpcError {
111    anyhow!(error).context("mock error").into()
112}
113
114#[async_trait]
115impl HummockMetaClient for MockHummockMetaClient {
116    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
117        self.hummock_manager
118            .unpin_version_before(self.context_id, unpin_version_before)
119            .await
120            .map_err(mock_err)
121    }
122
123    async fn get_current_version(&self) -> Result<HummockVersion> {
124        Ok(self.hummock_manager.get_current_version().await)
125    }
126
127    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
128        fail_point!("get_new_sst_ids_err", |_| Err(anyhow!(
129            "failpoint get_new_sst_ids_err"
130        )
131        .into()));
132        self.hummock_manager
133            .get_new_object_ids(number)
134            .await
135            .map_err(mock_err)
136            .map(|range| ObjectIdRange {
137                start_id: range.start_id + self.sst_offset,
138                end_id: range.end_id + self.sst_offset,
139            })
140    }
141
142    async fn commit_epoch_with_change_log(
143        &self,
144        epoch: HummockEpoch,
145        sync_result: SyncResult,
146        change_log_info: Option<HummockMetaClientChangeLogInfo>,
147    ) -> Result<()> {
148        let version: HummockVersion = self.hummock_manager.get_current_version().await;
149        let table_ids = version
150            .state_table_info
151            .info()
152            .keys()
153            .map(|table_id| table_id.table_id)
154            .collect::<BTreeSet<_>>();
155
156        let commit_table_ids = sync_result
157            .uncommitted_ssts
158            .iter()
159            .flat_map(|sstable| sstable.sst_info.table_ids.clone())
160            .chain({
161                sync_result
162                    .old_value_ssts
163                    .iter()
164                    .flat_map(|sstable| sstable.sst_info.table_ids.clone())
165            })
166            .chain(
167                sync_result
168                    .table_watermarks
169                    .keys()
170                    .map(|table_id| table_id.table_id),
171            )
172            .chain(table_ids.iter().cloned())
173            .collect::<BTreeSet<_>>();
174
175        let new_table_fragment_infos = if commit_table_ids
176            .iter()
177            .all(|table_id| table_ids.contains(table_id))
178        {
179            vec![]
180        } else {
181            vec![NewTableFragmentInfo {
182                table_ids: commit_table_ids
183                    .iter()
184                    .cloned()
185                    .map(TableId::from)
186                    .collect(),
187            }]
188        };
189
190        let sst_to_context = sync_result
191            .uncommitted_ssts
192            .iter()
193            .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, self.context_id))
194            .collect();
195        let new_table_watermark = sync_result.table_watermarks;
196        let table_change_log = match change_log_info {
197            Some(epochs) => {
198                assert_eq!(*epochs.last().expect("non-empty"), epoch);
199                build_table_change_log_delta(
200                    sync_result
201                        .old_value_ssts
202                        .into_iter()
203                        .map(|sst| sst.sst_info),
204                    sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info),
205                    &epochs,
206                    commit_table_ids.iter().map(|&table_id| (table_id, 0)),
207                )
208            }
209            None => Default::default(),
210        };
211
212        self.hummock_manager
213            .commit_epoch(CommitEpochInfo {
214                sstables: sync_result.uncommitted_ssts,
215                new_table_watermarks: new_table_watermark,
216                sst_to_context,
217                new_table_fragment_infos,
218                change_log_delta: table_change_log,
219                vector_index_delta: sync_result
220                    .vector_index_adds
221                    .into_iter()
222                    .map(|(table_id, adds)| (table_id, VectorIndexDelta::Adds(adds)))
223                    .collect(),
224                tables_to_commit: commit_table_ids
225                    .iter()
226                    .cloned()
227                    .map(|table_id| (TableId::new(table_id), epoch))
228                    .collect(),
229                truncate_tables: HashSet::new(),
230            })
231            .await
232            .map_err(mock_err)?;
233        Ok(())
234    }
235
236    async fn trigger_manual_compaction(
237        &self,
238        _compaction_group_id: u64,
239        _table_id: u32,
240        _level: u32,
241        _sst_ids: Vec<u64>,
242    ) -> Result<()> {
243        todo!()
244    }
245
246    async fn trigger_full_gc(
247        &self,
248        _sst_retention_time_sec: u64,
249        _prefix: Option<String>,
250    ) -> Result<()> {
251        unimplemented!()
252    }
253
254    async fn subscribe_compaction_event(
255        &self,
256    ) -> Result<(
257        UnboundedSender<SubscribeCompactionEventRequest>,
258        BoxStream<'static, CompactionEventItem>,
259    )> {
260        let context_id = self
261            .hummock_manager
262            .metadata_manager()
263            .add_worker_node(
264                WorkerType::Compactor,
265                HostAddress {
266                    host: "compactor".to_owned(),
267                    port: 0,
268                },
269                Default::default(),
270                Default::default(),
271            )
272            .await
273            .unwrap();
274        let _compactor_rx = self
275            .hummock_manager
276            .compactor_manager
277            .clone()
278            .add_compactor(context_id as _);
279
280        let (request_sender, mut request_receiver) =
281            unbounded_channel::<SubscribeCompactionEventRequest>();
282
283        self.compact_context_id
284            .store(context_id as _, Ordering::Release);
285
286        let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel();
287
288        let hummock_manager_compact = self.hummock_manager.clone();
289        let mut join_handle_vec = vec![];
290
291        let handle = tokio::spawn(async move {
292            loop {
293                let group_and_type = hummock_manager_compact
294                    .auto_pick_compaction_group_and_type()
295                    .await;
296
297                if group_and_type.is_none() {
298                    break;
299                }
300
301                let (group, task_type) = group_and_type.unwrap();
302
303                if let TaskType::Ttl = task_type {
304                    match hummock_manager_compact
305                        .metadata_manager_ref()
306                        .get_all_table_options()
307                        .await
308                        .map_err(|err| Error::MetaStore(err.into()))
309                    {
310                        Ok(table_options) => {
311                            hummock_manager_compact.update_table_id_to_table_option(table_options);
312                        }
313                        Err(e) => {
314                            tracing::error!(error = %e.as_report(), "get_all_table_options fail");
315                        }
316                    }
317                }
318
319                let mut selector: Box<dyn CompactionSelector> = match task_type {
320                    compact_task::TaskType::Dynamic => default_compaction_selector(),
321                    compact_task::TaskType::SpaceReclaim => {
322                        Box::<SpaceReclaimCompactionSelector>::default()
323                    }
324
325                    _ => panic!("Error type when mock_hummock_meta_client subscribe_compact_tasks"),
326                };
327                if let Some(task) = hummock_manager_compact
328                    .get_compact_task(group, &mut selector)
329                    .await
330                    .unwrap()
331                {
332                    let resp = SubscribeCompactionEventResponse {
333                        event: Some(ResponseEvent::CompactTask(task.into())),
334                        create_at: SystemTime::now()
335                            .duration_since(std::time::UNIX_EPOCH)
336                            .expect("Clock may have gone backwards")
337                            .as_millis() as u64,
338                    };
339
340                    let _ = task_tx.send(Ok(resp));
341                }
342            }
343        });
344
345        join_handle_vec.push(handle);
346
347        let hummock_manager_compact = self.hummock_manager.clone();
348        let report_handle = tokio::spawn(async move {
349            tracing::info!("report_handle start");
350            loop {
351                if let Some(item) = request_receiver.recv().await
352                    && let Event::ReportTask(ReportTask {
353                        task_id,
354                        task_status,
355                        sorted_output_ssts,
356                        table_stats_change,
357                        object_timestamps,
358                    }) = item.event.unwrap()
359                    && let Err(e) = hummock_manager_compact
360                        .report_compact_task(
361                            task_id,
362                            TaskStatus::try_from(task_status).unwrap(),
363                            sorted_output_ssts
364                                .into_iter()
365                                .map(SstableInfo::from)
366                                .collect_vec(),
367                            Some(table_stats_change),
368                            object_timestamps
369                                .into_iter()
370                                .map(|(id, ts)| (id.into(), ts))
371                                .collect(),
372                        )
373                        .await
374                {
375                    tracing::error!(error = %e.as_report(), "report compact_tack fail");
376                }
377            }
378        });
379
380        join_handle_vec.push(report_handle);
381
382        Ok((
383            request_sender,
384            Box::pin(CompactionEventItemStream {
385                inner: UnboundedReceiverStream::new(task_rx),
386                _handle: join_handle_vec,
387            }),
388        ))
389    }
390
391    async fn get_version_by_epoch(
392        &self,
393        _epoch: HummockEpoch,
394        _table_id: u32,
395    ) -> Result<PbHummockVersion> {
396        unimplemented!()
397    }
398
399    async fn subscribe_iceberg_compaction_event(
400        &self,
401    ) -> Result<(
402        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
403        BoxStream<'static, IcebergCompactionEventItem>,
404    )> {
405        unimplemented!()
406    }
407}
408
409impl MockHummockMetaClient {
410    pub fn hummock_manager_ref(&self) -> Arc<HummockManager> {
411        self.hummock_manager.clone()
412    }
413}
414
415pub struct CompactionEventItemStream {
416    inner: UnboundedReceiverStream<CompactionEventItem>,
417    _handle: Vec<JoinHandle<()>>,
418}
419
420impl Drop for CompactionEventItemStream {
421    fn drop(&mut self) {
422        self.inner.close();
423    }
424}
425
426impl Stream for CompactionEventItemStream {
427    type Item = CompactionEventItem;
428
429    fn poll_next(
430        mut self: std::pin::Pin<&mut Self>,
431        cx: &mut std::task::Context<'_>,
432    ) -> std::task::Poll<Option<Self::Item>> {
433        self.inner.poll_next_unpin(cx)
434    }
435}