1use std::collections::BTreeSet;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::time::SystemTime;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use fail::fail_point;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
28use risingwave_hummock_sdk::compact_task::CompactTask;
29use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::version::HummockVersion;
32use risingwave_hummock_sdk::{
33 HummockContextId, HummockEpoch, HummockVersionId, LocalSstableInfo, SstObjectIdRange,
34 SyncResult,
35};
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
38use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask};
39use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
40use risingwave_pb::hummock::{
41 PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
42 compact_task,
43};
44use risingwave_rpc_client::error::{Result, RpcError};
45use risingwave_rpc_client::{
46 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
47};
48use thiserror_ext::AsReport;
49use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
50use tokio::task::JoinHandle;
51use tokio_stream::wrappers::UnboundedReceiverStream;
52
53use crate::hummock::compaction::selector::{
54 CompactionSelector, SpaceReclaimCompactionSelector, default_compaction_selector,
55};
56use crate::hummock::error::Error;
57use crate::hummock::{CommitEpochInfo, HummockManager, NewTableFragmentInfo};
58
59pub struct MockHummockMetaClient {
60 hummock_manager: Arc<HummockManager>,
61 context_id: HummockContextId,
62 compact_context_id: AtomicU32,
63 sst_offset: u64,
65}
66
67impl MockHummockMetaClient {
68 pub fn new(
69 hummock_manager: Arc<HummockManager>,
70 context_id: HummockContextId,
71 ) -> MockHummockMetaClient {
72 MockHummockMetaClient {
73 hummock_manager,
74 context_id,
75 compact_context_id: AtomicU32::new(context_id),
76 sst_offset: 0,
77 }
78 }
79
80 pub fn with_sst_offset(
81 hummock_manager: Arc<HummockManager>,
82 context_id: HummockContextId,
83 sst_offset: u64,
84 ) -> Self {
85 Self {
86 hummock_manager,
87 context_id,
88 compact_context_id: AtomicU32::new(context_id),
89 sst_offset,
90 }
91 }
92
93 pub async fn get_compact_task(&self) -> Option<CompactTask> {
94 self.hummock_manager
95 .get_compact_task(
96 StaticCompactionGroupId::StateDefault.into(),
97 &mut default_compaction_selector(),
98 )
99 .await
100 .unwrap_or(None)
101 }
102
103 pub fn context_id(&self) -> HummockContextId {
104 self.context_id
105 }
106}
107
108fn mock_err(error: super::error::Error) -> RpcError {
109 anyhow!(error).context("mock error").into()
110}
111
112#[async_trait]
113impl HummockMetaClient for MockHummockMetaClient {
114 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
115 self.hummock_manager
116 .unpin_version_before(self.context_id, unpin_version_before)
117 .await
118 .map_err(mock_err)
119 }
120
121 async fn get_current_version(&self) -> Result<HummockVersion> {
122 Ok(self.hummock_manager.get_current_version().await)
123 }
124
125 async fn get_new_sst_ids(&self, number: u32) -> Result<SstObjectIdRange> {
126 fail_point!("get_new_sst_ids_err", |_| Err(anyhow!(
127 "failpoint get_new_sst_ids_err"
128 )
129 .into()));
130 self.hummock_manager
131 .get_new_sst_ids(number)
132 .await
133 .map_err(mock_err)
134 .map(|range| SstObjectIdRange {
135 start_id: range.start_id + self.sst_offset,
136 end_id: range.end_id + self.sst_offset,
137 })
138 }
139
140 async fn commit_epoch_with_change_log(
141 &self,
142 epoch: HummockEpoch,
143 sync_result: SyncResult,
144 change_log_info: Option<HummockMetaClientChangeLogInfo>,
145 ) -> Result<()> {
146 let version: HummockVersion = self.hummock_manager.get_current_version().await;
147 let table_ids = version
148 .state_table_info
149 .info()
150 .keys()
151 .map(|table_id| table_id.table_id)
152 .collect::<BTreeSet<_>>();
153
154 let commit_table_ids = sync_result
155 .uncommitted_ssts
156 .iter()
157 .flat_map(|sstable| sstable.sst_info.table_ids.clone())
158 .chain({
159 sync_result
160 .old_value_ssts
161 .iter()
162 .flat_map(|sstable| sstable.sst_info.table_ids.clone())
163 })
164 .chain(
165 sync_result
166 .table_watermarks
167 .keys()
168 .map(|table_id| table_id.table_id),
169 )
170 .chain(table_ids.iter().cloned())
171 .collect::<BTreeSet<_>>();
172
173 let new_table_fragment_infos = if commit_table_ids
174 .iter()
175 .all(|table_id| table_ids.contains(table_id))
176 {
177 vec![]
178 } else {
179 vec![NewTableFragmentInfo {
180 table_ids: commit_table_ids
181 .iter()
182 .cloned()
183 .map(TableId::from)
184 .collect(),
185 }]
186 };
187
188 let sst_to_context = sync_result
189 .uncommitted_ssts
190 .iter()
191 .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, self.context_id))
192 .collect();
193 let new_table_watermark = sync_result.table_watermarks;
194 let table_change_log = match change_log_info {
195 Some(epochs) => {
196 assert_eq!(*epochs.last().expect("non-empty"), epoch);
197 build_table_change_log_delta(
198 sync_result
199 .old_value_ssts
200 .into_iter()
201 .map(|sst| sst.sst_info),
202 sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info),
203 &epochs,
204 commit_table_ids.iter().map(|&table_id| (table_id, 0)),
205 )
206 }
207 None => Default::default(),
208 };
209
210 self.hummock_manager
211 .commit_epoch(CommitEpochInfo {
212 sstables: sync_result.uncommitted_ssts,
213 new_table_watermarks: new_table_watermark,
214 sst_to_context,
215 new_table_fragment_infos,
216 change_log_delta: table_change_log,
217 tables_to_commit: commit_table_ids
218 .iter()
219 .cloned()
220 .map(|table_id| (TableId::new(table_id), epoch))
221 .collect(),
222 })
223 .await
224 .map_err(mock_err)?;
225 Ok(())
226 }
227
228 async fn trigger_manual_compaction(
229 &self,
230 _compaction_group_id: u64,
231 _table_id: u32,
232 _level: u32,
233 _sst_ids: Vec<u64>,
234 ) -> Result<()> {
235 todo!()
236 }
237
238 async fn trigger_full_gc(
239 &self,
240 _sst_retention_time_sec: u64,
241 _prefix: Option<String>,
242 ) -> Result<()> {
243 unimplemented!()
244 }
245
246 async fn subscribe_compaction_event(
247 &self,
248 ) -> Result<(
249 UnboundedSender<SubscribeCompactionEventRequest>,
250 BoxStream<'static, CompactionEventItem>,
251 )> {
252 let context_id = self
253 .hummock_manager
254 .metadata_manager()
255 .add_worker_node(
256 WorkerType::Compactor,
257 HostAddress {
258 host: "compactor".to_owned(),
259 port: 0,
260 },
261 Default::default(),
262 Default::default(),
263 )
264 .await
265 .unwrap();
266 let _compactor_rx = self
267 .hummock_manager
268 .compactor_manager_ref_for_test()
269 .add_compactor(context_id as _);
270
271 let (request_sender, mut request_receiver) =
272 unbounded_channel::<SubscribeCompactionEventRequest>();
273
274 self.compact_context_id
275 .store(context_id as _, Ordering::Release);
276
277 let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel();
278
279 let hummock_manager_compact = self.hummock_manager.clone();
280 let mut join_handle_vec = vec![];
281
282 let handle = tokio::spawn(async move {
283 loop {
284 let group_and_type = hummock_manager_compact
285 .auto_pick_compaction_group_and_type()
286 .await;
287
288 if group_and_type.is_none() {
289 break;
290 }
291
292 let (group, task_type) = group_and_type.unwrap();
293
294 if let TaskType::Ttl = task_type {
295 match hummock_manager_compact
296 .metadata_manager_ref()
297 .get_all_table_options()
298 .await
299 .map_err(|err| Error::MetaStore(err.into()))
300 {
301 Ok(table_options) => {
302 hummock_manager_compact.update_table_id_to_table_option(table_options);
303 }
304 Err(e) => {
305 tracing::error!(error = %e.as_report(), "get_all_table_options fail");
306 }
307 }
308 }
309
310 let mut selector: Box<dyn CompactionSelector> = match task_type {
311 compact_task::TaskType::Dynamic => default_compaction_selector(),
312 compact_task::TaskType::SpaceReclaim => {
313 Box::<SpaceReclaimCompactionSelector>::default()
314 }
315
316 _ => panic!("Error type when mock_hummock_meta_client subscribe_compact_tasks"),
317 };
318 if let Some(task) = hummock_manager_compact
319 .get_compact_task(group, &mut selector)
320 .await
321 .unwrap()
322 {
323 let resp = SubscribeCompactionEventResponse {
324 event: Some(ResponseEvent::CompactTask(task.into())),
325 create_at: SystemTime::now()
326 .duration_since(std::time::UNIX_EPOCH)
327 .expect("Clock may have gone backwards")
328 .as_millis() as u64,
329 };
330
331 let _ = task_tx.send(Ok(resp));
332 }
333 }
334 });
335
336 join_handle_vec.push(handle);
337
338 let hummock_manager_compact = self.hummock_manager.clone();
339 let report_handle = tokio::spawn(async move {
340 tracing::info!("report_handle start");
341 loop {
342 if let Some(item) = request_receiver.recv().await {
343 if let Event::ReportTask(ReportTask {
344 task_id,
345 task_status,
346 sorted_output_ssts,
347 table_stats_change,
348 object_timestamps,
349 }) = item.event.unwrap()
350 {
351 if let Err(e) = hummock_manager_compact
352 .report_compact_task(
353 task_id,
354 TaskStatus::try_from(task_status).unwrap(),
355 sorted_output_ssts
356 .into_iter()
357 .map(SstableInfo::from)
358 .collect_vec(),
359 Some(table_stats_change),
360 object_timestamps,
361 )
362 .await
363 {
364 tracing::error!(error = %e.as_report(), "report compact_tack fail");
365 }
366 }
367 }
368 }
369 });
370
371 join_handle_vec.push(report_handle);
372
373 Ok((
374 request_sender,
375 Box::pin(CompactionEventItemStream {
376 inner: UnboundedReceiverStream::new(task_rx),
377 _handle: join_handle_vec,
378 }),
379 ))
380 }
381
382 async fn get_version_by_epoch(
383 &self,
384 _epoch: HummockEpoch,
385 _table_id: u32,
386 ) -> Result<PbHummockVersion> {
387 unimplemented!()
388 }
389}
390
391impl MockHummockMetaClient {
392 pub fn hummock_manager_ref(&self) -> Arc<HummockManager> {
393 self.hummock_manager.clone()
394 }
395}
396
397pub struct CompactionEventItemStream {
398 inner: UnboundedReceiverStream<CompactionEventItem>,
399 _handle: Vec<JoinHandle<()>>,
400}
401
402impl Drop for CompactionEventItemStream {
403 fn drop(&mut self) {
404 self.inner.close();
405 }
406}
407
408impl Stream for CompactionEventItemStream {
409 type Item = CompactionEventItem;
410
411 fn poll_next(
412 mut self: std::pin::Pin<&mut Self>,
413 cx: &mut std::task::Context<'_>,
414 ) -> std::task::Poll<Option<Self::Item>> {
415 self.inner.poll_next_unpin(cx)
416 }
417}