1use std::collections::{BTreeSet, HashSet};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::time::SystemTime;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use fail::fail_point;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_hummock_sdk::change_log::{TableChangeLogs, build_table_change_log_delta};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_hummock_sdk::version::HummockVersion;
32use risingwave_hummock_sdk::{
33 CompactionGroupId, HummockContextId, HummockEpoch, HummockVersionId, LocalSstableInfo,
34 ObjectIdRange, SyncResult,
35};
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
38use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask};
39use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
40use risingwave_pb::hummock::{
41 PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
42 compact_task,
43};
44use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
45use risingwave_pb::id::{HummockSstableId, JobId, TableId};
46use risingwave_rpc_client::error::{Result, RpcError};
47use risingwave_rpc_client::{
48 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
49 IcebergCompactionEventItem,
50};
51use thiserror_ext::AsReport;
52use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
53use tokio::task::JoinHandle;
54use tokio_stream::wrappers::UnboundedReceiverStream;
55
56use crate::hummock::compaction::selector::{
57 CompactionSelector, SpaceReclaimCompactionSelector, default_compaction_selector,
58};
59use crate::hummock::error::Error;
60use crate::hummock::{CommitEpochInfo, HummockManager, NewTableFragmentInfo};
61
62pub struct MockHummockMetaClient {
63 hummock_manager: Arc<HummockManager>,
64 context_id: HummockContextId,
65 compact_context_id: AtomicU32,
66 sst_offset: u64,
68}
69
70impl MockHummockMetaClient {
71 pub fn new(
72 hummock_manager: Arc<HummockManager>,
73 context_id: HummockContextId,
74 ) -> MockHummockMetaClient {
75 MockHummockMetaClient {
76 hummock_manager,
77 context_id,
78 compact_context_id: AtomicU32::new(context_id.as_raw_id()),
79 sst_offset: 0,
80 }
81 }
82
83 pub fn with_sst_offset(
84 hummock_manager: Arc<HummockManager>,
85 context_id: HummockContextId,
86 sst_offset: u64,
87 ) -> Self {
88 Self {
89 hummock_manager,
90 context_id,
91 compact_context_id: AtomicU32::new(context_id.as_raw_id()),
92 sst_offset,
93 }
94 }
95
96 pub async fn get_compact_task(&self) -> Option<CompactTask> {
97 self.hummock_manager
98 .get_compact_task(
99 StaticCompactionGroupId::StateDefault,
100 &mut *default_compaction_selector(),
101 )
102 .await
103 .unwrap_or(None)
104 }
105
106 pub fn context_id(&self) -> HummockContextId {
107 self.context_id
108 }
109}
110
111fn mock_err(error: super::error::Error) -> RpcError {
112 anyhow!(error).context("mock error").into()
113}
114
115#[async_trait]
116impl HummockMetaClient for MockHummockMetaClient {
117 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
118 self.hummock_manager
119 .unpin_version_before(self.context_id, unpin_version_before)
120 .await
121 .map_err(mock_err)
122 }
123
124 async fn get_current_version(&self) -> Result<HummockVersion> {
125 Ok(self.hummock_manager.get_current_version().await)
126 }
127
128 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
129 fail_point!("get_new_sst_ids_err", |_| Err(anyhow!(
130 "failpoint get_new_sst_ids_err"
131 )
132 .into()));
133 self.hummock_manager
134 .get_new_object_ids(number)
135 .await
136 .map_err(mock_err)
137 .map(|range| ObjectIdRange {
138 start_id: range.start_id + self.sst_offset,
139 end_id: range.end_id + self.sst_offset,
140 })
141 }
142
143 async fn commit_epoch_with_change_log(
144 &self,
145 epoch: HummockEpoch,
146 sync_result: SyncResult,
147 change_log_info: Option<HummockMetaClientChangeLogInfo>,
148 ) -> Result<()> {
149 let version: HummockVersion = self.hummock_manager.get_current_version().await;
150 let table_ids = version
151 .state_table_info
152 .info()
153 .keys()
154 .copied()
155 .collect::<BTreeSet<_>>();
156
157 let commit_table_ids = sync_result
158 .uncommitted_ssts
159 .iter()
160 .flat_map(|sstable| sstable.sst_info.table_ids.clone())
161 .chain({
162 sync_result
163 .old_value_ssts
164 .iter()
165 .flat_map(|sstable| sstable.sst_info.table_ids.clone())
166 })
167 .chain(sync_result.table_watermarks.keys().copied())
168 .chain(table_ids.iter().cloned())
169 .collect::<BTreeSet<_>>();
170
171 let new_table_fragment_infos = if commit_table_ids
172 .iter()
173 .all(|table_id| table_ids.contains(table_id))
174 {
175 vec![]
176 } else {
177 vec![NewTableFragmentInfo {
178 table_ids: commit_table_ids.iter().cloned().collect(),
179 }]
180 };
181
182 let sst_to_context = sync_result
183 .uncommitted_ssts
184 .iter()
185 .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, self.context_id))
186 .collect();
187 let new_table_watermark = sync_result.table_watermarks;
188 let table_change_log = match change_log_info {
189 Some(epochs) => {
190 assert_eq!(*epochs.last().expect("non-empty"), epoch);
191 build_table_change_log_delta(
192 sync_result
193 .old_value_ssts
194 .into_iter()
195 .map(|sst| sst.sst_info),
196 sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info),
197 &epochs,
198 commit_table_ids.iter().map(|&table_id| (table_id, 0)),
199 )
200 }
201 None => Default::default(),
202 };
203
204 self.hummock_manager
205 .commit_epoch(CommitEpochInfo {
206 sstables: sync_result.uncommitted_ssts,
207 new_table_watermarks: new_table_watermark,
208 sst_to_context,
209 new_table_fragment_infos,
210 change_log_delta: table_change_log,
211 vector_index_delta: sync_result
212 .vector_index_adds
213 .into_iter()
214 .map(|(table_id, adds)| (table_id, VectorIndexDelta::Adds(adds)))
215 .collect(),
216 tables_to_commit: commit_table_ids
217 .iter()
218 .cloned()
219 .map(|table_id| (table_id, epoch))
220 .collect(),
221 truncate_tables: HashSet::new(),
222 })
223 .await
224 .map_err(mock_err)?;
225 Ok(())
226 }
227
228 async fn trigger_manual_compaction(
229 &self,
230 _compaction_group_id: CompactionGroupId,
231 _table_id: JobId,
232 _level: u32,
233 _target_level: Option<u32>,
234 _sst_ids: Vec<HummockSstableId>,
235 _exclusive: bool,
236 ) -> Result<bool> {
237 todo!()
238 }
239
240 async fn trigger_full_gc(
241 &self,
242 _sst_retention_time_sec: u64,
243 _prefix: Option<String>,
244 ) -> Result<()> {
245 unimplemented!()
246 }
247
248 async fn subscribe_compaction_event(
249 &self,
250 ) -> Result<(
251 UnboundedSender<SubscribeCompactionEventRequest>,
252 BoxStream<'static, CompactionEventItem>,
253 )> {
254 let context_id = self
255 .hummock_manager
256 .metadata_manager()
257 .add_worker_node(
258 WorkerType::Compactor,
259 HostAddress {
260 host: "compactor".to_owned(),
261 port: 0,
262 },
263 Default::default(),
264 Default::default(),
265 )
266 .await
267 .unwrap();
268 let _compactor_rx = self
269 .hummock_manager
270 .compactor_manager
271 .clone()
272 .add_compactor(context_id);
273
274 let (request_sender, mut request_receiver) =
275 unbounded_channel::<SubscribeCompactionEventRequest>();
276
277 self.compact_context_id
278 .store(context_id.as_raw_id(), Ordering::Release);
279
280 let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel();
281
282 let hummock_manager_compact = self.hummock_manager.clone();
283 let mut join_handle_vec = vec![];
284
285 let handle = tokio::spawn(async move {
286 loop {
287 let snapshot = hummock_manager_compact.compaction_state.snapshot();
288 let Some((groups, task_type)) = snapshot.pick_compaction_groups_and_type() else {
289 break;
290 };
291
292 let group = groups[0];
293
294 if let TaskType::Ttl = task_type {
295 match hummock_manager_compact
296 .metadata_manager_ref()
297 .get_all_table_options()
298 .await
299 .map_err(|err| Error::MetaStore(err.into()))
300 {
301 Ok(table_options) => {
302 hummock_manager_compact.update_table_id_to_table_option(table_options);
303 }
304 Err(e) => {
305 tracing::error!(error = %e.as_report(), "get_all_table_options fail");
306 }
307 }
308 }
309
310 let mut selector: Box<dyn CompactionSelector> = match task_type {
311 compact_task::TaskType::Dynamic => default_compaction_selector(),
312 compact_task::TaskType::SpaceReclaim => {
313 Box::<SpaceReclaimCompactionSelector>::default()
314 }
315
316 _ => panic!("Error type when mock_hummock_meta_client subscribe_compact_tasks"),
317 };
318 if let Some(task) = hummock_manager_compact
319 .get_compact_task(group, &mut *selector)
320 .await
321 .unwrap()
322 {
323 let resp = SubscribeCompactionEventResponse {
324 event: Some(ResponseEvent::CompactTask(task.into())),
325 create_at: SystemTime::now()
326 .duration_since(std::time::UNIX_EPOCH)
327 .expect("Clock may have gone backwards")
328 .as_millis() as u64,
329 };
330
331 let _ = task_tx.send(Ok(resp));
332 }
333 }
334 });
335
336 join_handle_vec.push(handle);
337
338 let hummock_manager_compact = self.hummock_manager.clone();
339 let report_handle = tokio::spawn(async move {
340 tracing::info!("report_handle start");
341 loop {
342 if let Some(item) = request_receiver.recv().await
343 && let Event::ReportTask(ReportTask {
344 task_id,
345 task_status,
346 sorted_output_ssts,
347 table_stats_change,
348 object_timestamps,
349 }) = item.event.unwrap()
350 && let Err(e) = hummock_manager_compact
351 .report_compact_task(
352 task_id,
353 TaskStatus::try_from(task_status).unwrap(),
354 sorted_output_ssts
355 .into_iter()
356 .map(SstableInfo::from)
357 .collect_vec(),
358 Some(table_stats_change),
359 object_timestamps,
360 )
361 .await
362 {
363 tracing::error!(error = %e.as_report(), "report compact_tack fail");
364 }
365 }
366 });
367
368 join_handle_vec.push(report_handle);
369
370 Ok((
371 request_sender,
372 Box::pin(CompactionEventItemStream {
373 inner: UnboundedReceiverStream::new(task_rx),
374 _handle: join_handle_vec,
375 }),
376 ))
377 }
378
379 async fn get_version_by_epoch(
380 &self,
381 _epoch: HummockEpoch,
382 _table_id: TableId,
383 ) -> Result<PbHummockVersion> {
384 unimplemented!()
385 }
386
387 async fn subscribe_iceberg_compaction_event(
388 &self,
389 ) -> Result<(
390 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
391 BoxStream<'static, IcebergCompactionEventItem>,
392 )> {
393 unimplemented!()
394 }
395
396 async fn get_table_change_logs(
397 &self,
398 epoch_only: bool,
399 start_epoch_inclusive: Option<u64>,
400 end_epoch_inclusive: Option<u64>,
401 table_ids: Option<HashSet<TableId>>,
402 exclude_empty: bool,
403 limit: Option<u32>,
404 ) -> Result<TableChangeLogs> {
405 Ok(self
406 .hummock_manager
407 .get_table_change_logs(
408 epoch_only,
409 start_epoch_inclusive,
410 end_epoch_inclusive,
411 table_ids,
412 exclude_empty,
413 limit,
414 )
415 .await)
416 }
417}
418
419impl MockHummockMetaClient {
420 pub fn hummock_manager_ref(&self) -> Arc<HummockManager> {
421 self.hummock_manager.clone()
422 }
423}
424
425pub struct CompactionEventItemStream {
426 inner: UnboundedReceiverStream<CompactionEventItem>,
427 _handle: Vec<JoinHandle<()>>,
428}
429
430impl Drop for CompactionEventItemStream {
431 fn drop(&mut self) {
432 self.inner.close();
433 }
434}
435
436impl Stream for CompactionEventItemStream {
437 type Item = CompactionEventItem;
438
439 fn poll_next(
440 mut self: std::pin::Pin<&mut Self>,
441 cx: &mut std::task::Context<'_>,
442 ) -> std::task::Poll<Option<Self::Item>> {
443 self.inner.poll_next_unpin(cx)
444 }
445}