risingwave_meta/hummock/
mock_hummock_meta_client.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::time::SystemTime;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use fail::fail_point;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_hummock_sdk::change_log::{TableChangeLogs, build_table_change_log_delta};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_hummock_sdk::version::HummockVersion;
32use risingwave_hummock_sdk::{
33    CompactionGroupId, HummockContextId, HummockEpoch, HummockVersionId, LocalSstableInfo,
34    ObjectIdRange, SyncResult,
35};
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
38use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask};
39use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
40use risingwave_pb::hummock::{
41    PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
42    compact_task,
43};
44use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
45use risingwave_pb::id::{HummockSstableId, JobId, TableId};
46use risingwave_rpc_client::error::{Result, RpcError};
47use risingwave_rpc_client::{
48    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
49    IcebergCompactionEventItem,
50};
51use thiserror_ext::AsReport;
52use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
53use tokio::task::JoinHandle;
54use tokio_stream::wrappers::UnboundedReceiverStream;
55
56use crate::hummock::compaction::selector::{
57    CompactionSelector, SpaceReclaimCompactionSelector, default_compaction_selector,
58};
59use crate::hummock::error::Error;
60use crate::hummock::{CommitEpochInfo, HummockManager, NewTableFragmentInfo};
61
62pub struct MockHummockMetaClient {
63    hummock_manager: Arc<HummockManager>,
64    context_id: HummockContextId,
65    compact_context_id: AtomicU32,
66    // used for hummock replay to avoid collision with existing sst files
67    sst_offset: u64,
68}
69
70impl MockHummockMetaClient {
71    pub fn new(
72        hummock_manager: Arc<HummockManager>,
73        context_id: HummockContextId,
74    ) -> MockHummockMetaClient {
75        MockHummockMetaClient {
76            hummock_manager,
77            context_id,
78            compact_context_id: AtomicU32::new(context_id.as_raw_id()),
79            sst_offset: 0,
80        }
81    }
82
83    pub fn with_sst_offset(
84        hummock_manager: Arc<HummockManager>,
85        context_id: HummockContextId,
86        sst_offset: u64,
87    ) -> Self {
88        Self {
89            hummock_manager,
90            context_id,
91            compact_context_id: AtomicU32::new(context_id.as_raw_id()),
92            sst_offset,
93        }
94    }
95
96    pub async fn get_compact_task(&self) -> Option<CompactTask> {
97        self.hummock_manager
98            .get_compact_task(
99                StaticCompactionGroupId::StateDefault,
100                &mut *default_compaction_selector(),
101            )
102            .await
103            .unwrap_or(None)
104    }
105
106    pub fn context_id(&self) -> HummockContextId {
107        self.context_id
108    }
109}
110
111fn mock_err(error: super::error::Error) -> RpcError {
112    anyhow!(error).context("mock error").into()
113}
114
115#[async_trait]
116impl HummockMetaClient for MockHummockMetaClient {
117    async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
118        self.hummock_manager
119            .unpin_version_before(self.context_id, unpin_version_before)
120            .await
121            .map_err(mock_err)
122    }
123
124    async fn get_current_version(&self) -> Result<HummockVersion> {
125        Ok(self.hummock_manager.get_current_version().await)
126    }
127
128    async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
129        fail_point!("get_new_sst_ids_err", |_| Err(anyhow!(
130            "failpoint get_new_sst_ids_err"
131        )
132        .into()));
133        self.hummock_manager
134            .get_new_object_ids(number)
135            .await
136            .map_err(mock_err)
137            .map(|range| ObjectIdRange {
138                start_id: range.start_id + self.sst_offset,
139                end_id: range.end_id + self.sst_offset,
140            })
141    }
142
143    async fn commit_epoch_with_change_log(
144        &self,
145        epoch: HummockEpoch,
146        sync_result: SyncResult,
147        change_log_info: Option<HummockMetaClientChangeLogInfo>,
148    ) -> Result<()> {
149        let version: HummockVersion = self.hummock_manager.get_current_version().await;
150        let table_ids = version
151            .state_table_info
152            .info()
153            .keys()
154            .copied()
155            .collect::<BTreeSet<_>>();
156
157        let commit_table_ids = sync_result
158            .uncommitted_ssts
159            .iter()
160            .flat_map(|sstable| sstable.sst_info.table_ids.clone())
161            .chain({
162                sync_result
163                    .old_value_ssts
164                    .iter()
165                    .flat_map(|sstable| sstable.sst_info.table_ids.clone())
166            })
167            .chain(sync_result.table_watermarks.keys().copied())
168            .chain(table_ids.iter().cloned())
169            .collect::<BTreeSet<_>>();
170
171        let new_table_fragment_infos = if commit_table_ids
172            .iter()
173            .all(|table_id| table_ids.contains(table_id))
174        {
175            vec![]
176        } else {
177            vec![NewTableFragmentInfo {
178                table_ids: commit_table_ids.iter().cloned().collect(),
179            }]
180        };
181
182        let sst_to_context = sync_result
183            .uncommitted_ssts
184            .iter()
185            .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, self.context_id))
186            .collect();
187        let new_table_watermark = sync_result.table_watermarks;
188        let table_change_log = match change_log_info {
189            Some(epochs) => {
190                assert_eq!(*epochs.last().expect("non-empty"), epoch);
191                build_table_change_log_delta(
192                    sync_result
193                        .old_value_ssts
194                        .into_iter()
195                        .map(|sst| sst.sst_info),
196                    sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info),
197                    &epochs,
198                    commit_table_ids.iter().map(|&table_id| (table_id, 0)),
199                )
200            }
201            None => Default::default(),
202        };
203
204        self.hummock_manager
205            .commit_epoch(CommitEpochInfo {
206                sstables: sync_result.uncommitted_ssts,
207                new_table_watermarks: new_table_watermark,
208                sst_to_context,
209                new_table_fragment_infos,
210                change_log_delta: table_change_log,
211                vector_index_delta: sync_result
212                    .vector_index_adds
213                    .into_iter()
214                    .map(|(table_id, adds)| (table_id, VectorIndexDelta::Adds(adds)))
215                    .collect(),
216                tables_to_commit: commit_table_ids
217                    .iter()
218                    .cloned()
219                    .map(|table_id| (table_id, epoch))
220                    .collect(),
221                truncate_tables: HashSet::new(),
222            })
223            .await
224            .map_err(mock_err)?;
225        Ok(())
226    }
227
228    async fn trigger_manual_compaction(
229        &self,
230        _compaction_group_id: CompactionGroupId,
231        _table_id: JobId,
232        _level: u32,
233        _target_level: Option<u32>,
234        _sst_ids: Vec<HummockSstableId>,
235        _exclusive: bool,
236    ) -> Result<bool> {
237        todo!()
238    }
239
240    async fn trigger_full_gc(
241        &self,
242        _sst_retention_time_sec: u64,
243        _prefix: Option<String>,
244    ) -> Result<()> {
245        unimplemented!()
246    }
247
248    async fn subscribe_compaction_event(
249        &self,
250    ) -> Result<(
251        UnboundedSender<SubscribeCompactionEventRequest>,
252        BoxStream<'static, CompactionEventItem>,
253    )> {
254        let context_id = self
255            .hummock_manager
256            .metadata_manager()
257            .add_worker_node(
258                WorkerType::Compactor,
259                HostAddress {
260                    host: "compactor".to_owned(),
261                    port: 0,
262                },
263                Default::default(),
264                Default::default(),
265            )
266            .await
267            .unwrap();
268        let _compactor_rx = self
269            .hummock_manager
270            .compactor_manager
271            .clone()
272            .add_compactor(context_id);
273
274        let (request_sender, mut request_receiver) =
275            unbounded_channel::<SubscribeCompactionEventRequest>();
276
277        self.compact_context_id
278            .store(context_id.as_raw_id(), Ordering::Release);
279
280        let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel();
281
282        let hummock_manager_compact = self.hummock_manager.clone();
283        let mut join_handle_vec = vec![];
284
285        let handle = tokio::spawn(async move {
286            loop {
287                let snapshot = hummock_manager_compact.compaction_state.snapshot();
288                let Some((groups, task_type)) = snapshot.pick_compaction_groups_and_type() else {
289                    break;
290                };
291
292                let group = groups[0];
293
294                if let TaskType::Ttl = task_type {
295                    match hummock_manager_compact
296                        .metadata_manager_ref()
297                        .get_all_table_options()
298                        .await
299                        .map_err(|err| Error::MetaStore(err.into()))
300                    {
301                        Ok(table_options) => {
302                            hummock_manager_compact.update_table_id_to_table_option(table_options);
303                        }
304                        Err(e) => {
305                            tracing::error!(error = %e.as_report(), "get_all_table_options fail");
306                        }
307                    }
308                }
309
310                let mut selector: Box<dyn CompactionSelector> = match task_type {
311                    compact_task::TaskType::Dynamic => default_compaction_selector(),
312                    compact_task::TaskType::SpaceReclaim => {
313                        Box::<SpaceReclaimCompactionSelector>::default()
314                    }
315
316                    _ => panic!("Error type when mock_hummock_meta_client subscribe_compact_tasks"),
317                };
318                if let Some(task) = hummock_manager_compact
319                    .get_compact_task(group, &mut *selector)
320                    .await
321                    .unwrap()
322                {
323                    let resp = SubscribeCompactionEventResponse {
324                        event: Some(ResponseEvent::CompactTask(task.into())),
325                        create_at: SystemTime::now()
326                            .duration_since(std::time::UNIX_EPOCH)
327                            .expect("Clock may have gone backwards")
328                            .as_millis() as u64,
329                    };
330
331                    let _ = task_tx.send(Ok(resp));
332                }
333            }
334        });
335
336        join_handle_vec.push(handle);
337
338        let hummock_manager_compact = self.hummock_manager.clone();
339        let report_handle = tokio::spawn(async move {
340            tracing::info!("report_handle start");
341            loop {
342                if let Some(item) = request_receiver.recv().await
343                    && let Event::ReportTask(ReportTask {
344                        task_id,
345                        task_status,
346                        sorted_output_ssts,
347                        table_stats_change,
348                        object_timestamps,
349                    }) = item.event.unwrap()
350                    && let Err(e) = hummock_manager_compact
351                        .report_compact_task(
352                            task_id,
353                            TaskStatus::try_from(task_status).unwrap(),
354                            sorted_output_ssts
355                                .into_iter()
356                                .map(SstableInfo::from)
357                                .collect_vec(),
358                            Some(table_stats_change),
359                            object_timestamps,
360                        )
361                        .await
362                {
363                    tracing::error!(error = %e.as_report(), "report compact_tack fail");
364                }
365            }
366        });
367
368        join_handle_vec.push(report_handle);
369
370        Ok((
371            request_sender,
372            Box::pin(CompactionEventItemStream {
373                inner: UnboundedReceiverStream::new(task_rx),
374                _handle: join_handle_vec,
375            }),
376        ))
377    }
378
379    async fn get_version_by_epoch(
380        &self,
381        _epoch: HummockEpoch,
382        _table_id: TableId,
383    ) -> Result<PbHummockVersion> {
384        unimplemented!()
385    }
386
387    async fn subscribe_iceberg_compaction_event(
388        &self,
389    ) -> Result<(
390        UnboundedSender<SubscribeIcebergCompactionEventRequest>,
391        BoxStream<'static, IcebergCompactionEventItem>,
392    )> {
393        unimplemented!()
394    }
395
396    async fn get_table_change_logs(
397        &self,
398        epoch_only: bool,
399        start_epoch_inclusive: Option<u64>,
400        end_epoch_inclusive: Option<u64>,
401        table_ids: Option<HashSet<TableId>>,
402        exclude_empty: bool,
403        limit: Option<u32>,
404    ) -> Result<TableChangeLogs> {
405        Ok(self
406            .hummock_manager
407            .get_table_change_logs(
408                epoch_only,
409                start_epoch_inclusive,
410                end_epoch_inclusive,
411                table_ids,
412                exclude_empty,
413                limit,
414            )
415            .await)
416    }
417}
418
419impl MockHummockMetaClient {
420    pub fn hummock_manager_ref(&self) -> Arc<HummockManager> {
421        self.hummock_manager.clone()
422    }
423}
424
425pub struct CompactionEventItemStream {
426    inner: UnboundedReceiverStream<CompactionEventItem>,
427    _handle: Vec<JoinHandle<()>>,
428}
429
430impl Drop for CompactionEventItemStream {
431    fn drop(&mut self) {
432        self.inner.close();
433    }
434}
435
436impl Stream for CompactionEventItemStream {
437    type Item = CompactionEventItem;
438
439    fn poll_next(
440        mut self: std::pin::Pin<&mut Self>,
441        cx: &mut std::task::Context<'_>,
442    ) -> std::task::Poll<Option<Self::Item>> {
443        self.inner.poll_next_unpin(cx)
444    }
445}