1use std::collections::BTreeSet;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::time::SystemTime;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use fail::fail_point;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
28use risingwave_hummock_sdk::compact_task::CompactTask;
29use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::version::HummockVersion;
32use risingwave_hummock_sdk::{
33 HummockContextId, HummockEpoch, HummockVersionId, LocalSstableInfo, ObjectIdRange, SyncResult,
34};
35use risingwave_pb::common::{HostAddress, WorkerType};
36use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
37use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask};
38use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
39use risingwave_pb::hummock::{
40 PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
41 compact_task,
42};
43use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
44use risingwave_rpc_client::error::{Result, RpcError};
45use risingwave_rpc_client::{
46 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
47 IcebergCompactionEventItem,
48};
49use thiserror_ext::AsReport;
50use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
51use tokio::task::JoinHandle;
52use tokio_stream::wrappers::UnboundedReceiverStream;
53
54use crate::hummock::compaction::selector::{
55 CompactionSelector, SpaceReclaimCompactionSelector, default_compaction_selector,
56};
57use crate::hummock::error::Error;
58use crate::hummock::{CommitEpochInfo, HummockManager, NewTableFragmentInfo};
59
60pub struct MockHummockMetaClient {
61 hummock_manager: Arc<HummockManager>,
62 context_id: HummockContextId,
63 compact_context_id: AtomicU32,
64 sst_offset: u64,
66}
67
68impl MockHummockMetaClient {
69 pub fn new(
70 hummock_manager: Arc<HummockManager>,
71 context_id: HummockContextId,
72 ) -> MockHummockMetaClient {
73 MockHummockMetaClient {
74 hummock_manager,
75 context_id,
76 compact_context_id: AtomicU32::new(context_id),
77 sst_offset: 0,
78 }
79 }
80
81 pub fn with_sst_offset(
82 hummock_manager: Arc<HummockManager>,
83 context_id: HummockContextId,
84 sst_offset: u64,
85 ) -> Self {
86 Self {
87 hummock_manager,
88 context_id,
89 compact_context_id: AtomicU32::new(context_id),
90 sst_offset,
91 }
92 }
93
94 pub async fn get_compact_task(&self) -> Option<CompactTask> {
95 self.hummock_manager
96 .get_compact_task(
97 StaticCompactionGroupId::StateDefault.into(),
98 &mut default_compaction_selector(),
99 )
100 .await
101 .unwrap_or(None)
102 }
103
104 pub fn context_id(&self) -> HummockContextId {
105 self.context_id
106 }
107}
108
109fn mock_err(error: super::error::Error) -> RpcError {
110 anyhow!(error).context("mock error").into()
111}
112
113#[async_trait]
114impl HummockMetaClient for MockHummockMetaClient {
115 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
116 self.hummock_manager
117 .unpin_version_before(self.context_id, unpin_version_before)
118 .await
119 .map_err(mock_err)
120 }
121
122 async fn get_current_version(&self) -> Result<HummockVersion> {
123 Ok(self.hummock_manager.get_current_version().await)
124 }
125
126 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
127 fail_point!("get_new_sst_ids_err", |_| Err(anyhow!(
128 "failpoint get_new_sst_ids_err"
129 )
130 .into()));
131 self.hummock_manager
132 .get_new_object_ids(number)
133 .await
134 .map_err(mock_err)
135 .map(|range| ObjectIdRange {
136 start_id: range.start_id + self.sst_offset,
137 end_id: range.end_id + self.sst_offset,
138 })
139 }
140
141 async fn commit_epoch_with_change_log(
142 &self,
143 epoch: HummockEpoch,
144 sync_result: SyncResult,
145 change_log_info: Option<HummockMetaClientChangeLogInfo>,
146 ) -> Result<()> {
147 let version: HummockVersion = self.hummock_manager.get_current_version().await;
148 let table_ids = version
149 .state_table_info
150 .info()
151 .keys()
152 .map(|table_id| table_id.table_id)
153 .collect::<BTreeSet<_>>();
154
155 let commit_table_ids = sync_result
156 .uncommitted_ssts
157 .iter()
158 .flat_map(|sstable| sstable.sst_info.table_ids.clone())
159 .chain({
160 sync_result
161 .old_value_ssts
162 .iter()
163 .flat_map(|sstable| sstable.sst_info.table_ids.clone())
164 })
165 .chain(
166 sync_result
167 .table_watermarks
168 .keys()
169 .map(|table_id| table_id.table_id),
170 )
171 .chain(table_ids.iter().cloned())
172 .collect::<BTreeSet<_>>();
173
174 let new_table_fragment_infos = if commit_table_ids
175 .iter()
176 .all(|table_id| table_ids.contains(table_id))
177 {
178 vec![]
179 } else {
180 vec![NewTableFragmentInfo {
181 table_ids: commit_table_ids
182 .iter()
183 .cloned()
184 .map(TableId::from)
185 .collect(),
186 }]
187 };
188
189 let sst_to_context = sync_result
190 .uncommitted_ssts
191 .iter()
192 .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, self.context_id))
193 .collect();
194 let new_table_watermark = sync_result.table_watermarks;
195 let table_change_log = match change_log_info {
196 Some(epochs) => {
197 assert_eq!(*epochs.last().expect("non-empty"), epoch);
198 build_table_change_log_delta(
199 sync_result
200 .old_value_ssts
201 .into_iter()
202 .map(|sst| sst.sst_info),
203 sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info),
204 &epochs,
205 commit_table_ids.iter().map(|&table_id| (table_id, 0)),
206 )
207 }
208 None => Default::default(),
209 };
210
211 self.hummock_manager
212 .commit_epoch(CommitEpochInfo {
213 sstables: sync_result.uncommitted_ssts,
214 new_table_watermarks: new_table_watermark,
215 sst_to_context,
216 new_table_fragment_infos,
217 change_log_delta: table_change_log,
218 vector_index_delta: Default::default(),
219 tables_to_commit: commit_table_ids
220 .iter()
221 .cloned()
222 .map(|table_id| (TableId::new(table_id), epoch))
223 .collect(),
224 })
225 .await
226 .map_err(mock_err)?;
227 Ok(())
228 }
229
230 async fn trigger_manual_compaction(
231 &self,
232 _compaction_group_id: u64,
233 _table_id: u32,
234 _level: u32,
235 _sst_ids: Vec<u64>,
236 ) -> Result<()> {
237 todo!()
238 }
239
240 async fn trigger_full_gc(
241 &self,
242 _sst_retention_time_sec: u64,
243 _prefix: Option<String>,
244 ) -> Result<()> {
245 unimplemented!()
246 }
247
248 async fn subscribe_compaction_event(
249 &self,
250 ) -> Result<(
251 UnboundedSender<SubscribeCompactionEventRequest>,
252 BoxStream<'static, CompactionEventItem>,
253 )> {
254 let context_id = self
255 .hummock_manager
256 .metadata_manager()
257 .add_worker_node(
258 WorkerType::Compactor,
259 HostAddress {
260 host: "compactor".to_owned(),
261 port: 0,
262 },
263 Default::default(),
264 Default::default(),
265 )
266 .await
267 .unwrap();
268 let _compactor_rx = self
269 .hummock_manager
270 .compactor_manager
271 .clone()
272 .add_compactor(context_id as _);
273
274 let (request_sender, mut request_receiver) =
275 unbounded_channel::<SubscribeCompactionEventRequest>();
276
277 self.compact_context_id
278 .store(context_id as _, Ordering::Release);
279
280 let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel();
281
282 let hummock_manager_compact = self.hummock_manager.clone();
283 let mut join_handle_vec = vec![];
284
285 let handle = tokio::spawn(async move {
286 loop {
287 let group_and_type = hummock_manager_compact
288 .auto_pick_compaction_group_and_type()
289 .await;
290
291 if group_and_type.is_none() {
292 break;
293 }
294
295 let (group, task_type) = group_and_type.unwrap();
296
297 if let TaskType::Ttl = task_type {
298 match hummock_manager_compact
299 .metadata_manager_ref()
300 .get_all_table_options()
301 .await
302 .map_err(|err| Error::MetaStore(err.into()))
303 {
304 Ok(table_options) => {
305 hummock_manager_compact.update_table_id_to_table_option(table_options);
306 }
307 Err(e) => {
308 tracing::error!(error = %e.as_report(), "get_all_table_options fail");
309 }
310 }
311 }
312
313 let mut selector: Box<dyn CompactionSelector> = match task_type {
314 compact_task::TaskType::Dynamic => default_compaction_selector(),
315 compact_task::TaskType::SpaceReclaim => {
316 Box::<SpaceReclaimCompactionSelector>::default()
317 }
318
319 _ => panic!("Error type when mock_hummock_meta_client subscribe_compact_tasks"),
320 };
321 if let Some(task) = hummock_manager_compact
322 .get_compact_task(group, &mut selector)
323 .await
324 .unwrap()
325 {
326 let resp = SubscribeCompactionEventResponse {
327 event: Some(ResponseEvent::CompactTask(task.into())),
328 create_at: SystemTime::now()
329 .duration_since(std::time::UNIX_EPOCH)
330 .expect("Clock may have gone backwards")
331 .as_millis() as u64,
332 };
333
334 let _ = task_tx.send(Ok(resp));
335 }
336 }
337 });
338
339 join_handle_vec.push(handle);
340
341 let hummock_manager_compact = self.hummock_manager.clone();
342 let report_handle = tokio::spawn(async move {
343 tracing::info!("report_handle start");
344 loop {
345 if let Some(item) = request_receiver.recv().await {
346 if let Event::ReportTask(ReportTask {
347 task_id,
348 task_status,
349 sorted_output_ssts,
350 table_stats_change,
351 object_timestamps,
352 }) = item.event.unwrap()
353 {
354 if let Err(e) = hummock_manager_compact
355 .report_compact_task(
356 task_id,
357 TaskStatus::try_from(task_status).unwrap(),
358 sorted_output_ssts
359 .into_iter()
360 .map(SstableInfo::from)
361 .collect_vec(),
362 Some(table_stats_change),
363 object_timestamps
364 .into_iter()
365 .map(|(id, ts)| (id.into(), ts))
366 .collect(),
367 )
368 .await
369 {
370 tracing::error!(error = %e.as_report(), "report compact_tack fail");
371 }
372 }
373 }
374 }
375 });
376
377 join_handle_vec.push(report_handle);
378
379 Ok((
380 request_sender,
381 Box::pin(CompactionEventItemStream {
382 inner: UnboundedReceiverStream::new(task_rx),
383 _handle: join_handle_vec,
384 }),
385 ))
386 }
387
388 async fn get_version_by_epoch(
389 &self,
390 _epoch: HummockEpoch,
391 _table_id: u32,
392 ) -> Result<PbHummockVersion> {
393 unimplemented!()
394 }
395
396 async fn subscribe_iceberg_compaction_event(
397 &self,
398 ) -> Result<(
399 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
400 BoxStream<'static, IcebergCompactionEventItem>,
401 )> {
402 unimplemented!()
403 }
404}
405
406impl MockHummockMetaClient {
407 pub fn hummock_manager_ref(&self) -> Arc<HummockManager> {
408 self.hummock_manager.clone()
409 }
410}
411
412pub struct CompactionEventItemStream {
413 inner: UnboundedReceiverStream<CompactionEventItem>,
414 _handle: Vec<JoinHandle<()>>,
415}
416
417impl Drop for CompactionEventItemStream {
418 fn drop(&mut self) {
419 self.inner.close();
420 }
421}
422
423impl Stream for CompactionEventItemStream {
424 type Item = CompactionEventItem;
425
426 fn poll_next(
427 mut self: std::pin::Pin<&mut Self>,
428 cx: &mut std::task::Context<'_>,
429 ) -> std::task::Poll<Option<Self::Item>> {
430 self.inner.poll_next_unpin(cx)
431 }
432}