1use std::collections::{BTreeSet, HashSet};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::time::SystemTime;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use fail::fail_point;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
28use risingwave_hummock_sdk::compact_task::CompactTask;
29use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
32use risingwave_hummock_sdk::version::HummockVersion;
33use risingwave_hummock_sdk::{
34 HummockContextId, HummockEpoch, HummockVersionId, LocalSstableInfo, ObjectIdRange, SyncResult,
35};
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
38use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask};
39use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
40use risingwave_pb::hummock::{
41 PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
42 compact_task,
43};
44use risingwave_pb::iceberg_compaction::SubscribeIcebergCompactionEventRequest;
45use risingwave_rpc_client::error::{Result, RpcError};
46use risingwave_rpc_client::{
47 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
48 IcebergCompactionEventItem,
49};
50use thiserror_ext::AsReport;
51use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
52use tokio::task::JoinHandle;
53use tokio_stream::wrappers::UnboundedReceiverStream;
54
55use crate::hummock::compaction::selector::{
56 CompactionSelector, SpaceReclaimCompactionSelector, default_compaction_selector,
57};
58use crate::hummock::error::Error;
59use crate::hummock::{CommitEpochInfo, HummockManager, NewTableFragmentInfo};
60
61pub struct MockHummockMetaClient {
62 hummock_manager: Arc<HummockManager>,
63 context_id: HummockContextId,
64 compact_context_id: AtomicU32,
65 sst_offset: u64,
67}
68
69impl MockHummockMetaClient {
70 pub fn new(
71 hummock_manager: Arc<HummockManager>,
72 context_id: HummockContextId,
73 ) -> MockHummockMetaClient {
74 MockHummockMetaClient {
75 hummock_manager,
76 context_id,
77 compact_context_id: AtomicU32::new(context_id),
78 sst_offset: 0,
79 }
80 }
81
82 pub fn with_sst_offset(
83 hummock_manager: Arc<HummockManager>,
84 context_id: HummockContextId,
85 sst_offset: u64,
86 ) -> Self {
87 Self {
88 hummock_manager,
89 context_id,
90 compact_context_id: AtomicU32::new(context_id),
91 sst_offset,
92 }
93 }
94
95 pub async fn get_compact_task(&self) -> Option<CompactTask> {
96 self.hummock_manager
97 .get_compact_task(
98 StaticCompactionGroupId::StateDefault.into(),
99 &mut default_compaction_selector(),
100 )
101 .await
102 .unwrap_or(None)
103 }
104
105 pub fn context_id(&self) -> HummockContextId {
106 self.context_id
107 }
108}
109
110fn mock_err(error: super::error::Error) -> RpcError {
111 anyhow!(error).context("mock error").into()
112}
113
114#[async_trait]
115impl HummockMetaClient for MockHummockMetaClient {
116 async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()> {
117 self.hummock_manager
118 .unpin_version_before(self.context_id, unpin_version_before)
119 .await
120 .map_err(mock_err)
121 }
122
123 async fn get_current_version(&self) -> Result<HummockVersion> {
124 Ok(self.hummock_manager.get_current_version().await)
125 }
126
127 async fn get_new_object_ids(&self, number: u32) -> Result<ObjectIdRange> {
128 fail_point!("get_new_sst_ids_err", |_| Err(anyhow!(
129 "failpoint get_new_sst_ids_err"
130 )
131 .into()));
132 self.hummock_manager
133 .get_new_object_ids(number)
134 .await
135 .map_err(mock_err)
136 .map(|range| ObjectIdRange {
137 start_id: range.start_id + self.sst_offset,
138 end_id: range.end_id + self.sst_offset,
139 })
140 }
141
142 async fn commit_epoch_with_change_log(
143 &self,
144 epoch: HummockEpoch,
145 sync_result: SyncResult,
146 change_log_info: Option<HummockMetaClientChangeLogInfo>,
147 ) -> Result<()> {
148 let version: HummockVersion = self.hummock_manager.get_current_version().await;
149 let table_ids = version
150 .state_table_info
151 .info()
152 .keys()
153 .map(|table_id| table_id.table_id)
154 .collect::<BTreeSet<_>>();
155
156 let commit_table_ids = sync_result
157 .uncommitted_ssts
158 .iter()
159 .flat_map(|sstable| sstable.sst_info.table_ids.clone())
160 .chain({
161 sync_result
162 .old_value_ssts
163 .iter()
164 .flat_map(|sstable| sstable.sst_info.table_ids.clone())
165 })
166 .chain(
167 sync_result
168 .table_watermarks
169 .keys()
170 .map(|table_id| table_id.table_id),
171 )
172 .chain(table_ids.iter().cloned())
173 .collect::<BTreeSet<_>>();
174
175 let new_table_fragment_infos = if commit_table_ids
176 .iter()
177 .all(|table_id| table_ids.contains(table_id))
178 {
179 vec![]
180 } else {
181 vec![NewTableFragmentInfo {
182 table_ids: commit_table_ids
183 .iter()
184 .cloned()
185 .map(TableId::from)
186 .collect(),
187 }]
188 };
189
190 let sst_to_context = sync_result
191 .uncommitted_ssts
192 .iter()
193 .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, self.context_id))
194 .collect();
195 let new_table_watermark = sync_result.table_watermarks;
196 let table_change_log = match change_log_info {
197 Some(epochs) => {
198 assert_eq!(*epochs.last().expect("non-empty"), epoch);
199 build_table_change_log_delta(
200 sync_result
201 .old_value_ssts
202 .into_iter()
203 .map(|sst| sst.sst_info),
204 sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info),
205 &epochs,
206 commit_table_ids.iter().map(|&table_id| (table_id, 0)),
207 )
208 }
209 None => Default::default(),
210 };
211
212 self.hummock_manager
213 .commit_epoch(CommitEpochInfo {
214 sstables: sync_result.uncommitted_ssts,
215 new_table_watermarks: new_table_watermark,
216 sst_to_context,
217 new_table_fragment_infos,
218 change_log_delta: table_change_log,
219 vector_index_delta: sync_result
220 .vector_index_adds
221 .into_iter()
222 .map(|(table_id, adds)| (table_id, VectorIndexDelta::Adds(adds)))
223 .collect(),
224 tables_to_commit: commit_table_ids
225 .iter()
226 .cloned()
227 .map(|table_id| (TableId::new(table_id), epoch))
228 .collect(),
229 truncate_tables: HashSet::new(),
230 })
231 .await
232 .map_err(mock_err)?;
233 Ok(())
234 }
235
236 async fn trigger_manual_compaction(
237 &self,
238 _compaction_group_id: u64,
239 _table_id: u32,
240 _level: u32,
241 _sst_ids: Vec<u64>,
242 ) -> Result<()> {
243 todo!()
244 }
245
246 async fn trigger_full_gc(
247 &self,
248 _sst_retention_time_sec: u64,
249 _prefix: Option<String>,
250 ) -> Result<()> {
251 unimplemented!()
252 }
253
254 async fn subscribe_compaction_event(
255 &self,
256 ) -> Result<(
257 UnboundedSender<SubscribeCompactionEventRequest>,
258 BoxStream<'static, CompactionEventItem>,
259 )> {
260 let context_id = self
261 .hummock_manager
262 .metadata_manager()
263 .add_worker_node(
264 WorkerType::Compactor,
265 HostAddress {
266 host: "compactor".to_owned(),
267 port: 0,
268 },
269 Default::default(),
270 Default::default(),
271 )
272 .await
273 .unwrap();
274 let _compactor_rx = self
275 .hummock_manager
276 .compactor_manager
277 .clone()
278 .add_compactor(context_id as _);
279
280 let (request_sender, mut request_receiver) =
281 unbounded_channel::<SubscribeCompactionEventRequest>();
282
283 self.compact_context_id
284 .store(context_id as _, Ordering::Release);
285
286 let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel();
287
288 let hummock_manager_compact = self.hummock_manager.clone();
289 let mut join_handle_vec = vec![];
290
291 let handle = tokio::spawn(async move {
292 loop {
293 let group_and_type = hummock_manager_compact
294 .auto_pick_compaction_group_and_type()
295 .await;
296
297 if group_and_type.is_none() {
298 break;
299 }
300
301 let (group, task_type) = group_and_type.unwrap();
302
303 if let TaskType::Ttl = task_type {
304 match hummock_manager_compact
305 .metadata_manager_ref()
306 .get_all_table_options()
307 .await
308 .map_err(|err| Error::MetaStore(err.into()))
309 {
310 Ok(table_options) => {
311 hummock_manager_compact.update_table_id_to_table_option(table_options);
312 }
313 Err(e) => {
314 tracing::error!(error = %e.as_report(), "get_all_table_options fail");
315 }
316 }
317 }
318
319 let mut selector: Box<dyn CompactionSelector> = match task_type {
320 compact_task::TaskType::Dynamic => default_compaction_selector(),
321 compact_task::TaskType::SpaceReclaim => {
322 Box::<SpaceReclaimCompactionSelector>::default()
323 }
324
325 _ => panic!("Error type when mock_hummock_meta_client subscribe_compact_tasks"),
326 };
327 if let Some(task) = hummock_manager_compact
328 .get_compact_task(group, &mut selector)
329 .await
330 .unwrap()
331 {
332 let resp = SubscribeCompactionEventResponse {
333 event: Some(ResponseEvent::CompactTask(task.into())),
334 create_at: SystemTime::now()
335 .duration_since(std::time::UNIX_EPOCH)
336 .expect("Clock may have gone backwards")
337 .as_millis() as u64,
338 };
339
340 let _ = task_tx.send(Ok(resp));
341 }
342 }
343 });
344
345 join_handle_vec.push(handle);
346
347 let hummock_manager_compact = self.hummock_manager.clone();
348 let report_handle = tokio::spawn(async move {
349 tracing::info!("report_handle start");
350 loop {
351 if let Some(item) = request_receiver.recv().await
352 && let Event::ReportTask(ReportTask {
353 task_id,
354 task_status,
355 sorted_output_ssts,
356 table_stats_change,
357 object_timestamps,
358 }) = item.event.unwrap()
359 && let Err(e) = hummock_manager_compact
360 .report_compact_task(
361 task_id,
362 TaskStatus::try_from(task_status).unwrap(),
363 sorted_output_ssts
364 .into_iter()
365 .map(SstableInfo::from)
366 .collect_vec(),
367 Some(table_stats_change),
368 object_timestamps
369 .into_iter()
370 .map(|(id, ts)| (id.into(), ts))
371 .collect(),
372 )
373 .await
374 {
375 tracing::error!(error = %e.as_report(), "report compact_tack fail");
376 }
377 }
378 });
379
380 join_handle_vec.push(report_handle);
381
382 Ok((
383 request_sender,
384 Box::pin(CompactionEventItemStream {
385 inner: UnboundedReceiverStream::new(task_rx),
386 _handle: join_handle_vec,
387 }),
388 ))
389 }
390
391 async fn get_version_by_epoch(
392 &self,
393 _epoch: HummockEpoch,
394 _table_id: u32,
395 ) -> Result<PbHummockVersion> {
396 unimplemented!()
397 }
398
399 async fn subscribe_iceberg_compaction_event(
400 &self,
401 ) -> Result<(
402 UnboundedSender<SubscribeIcebergCompactionEventRequest>,
403 BoxStream<'static, IcebergCompactionEventItem>,
404 )> {
405 unimplemented!()
406 }
407}
408
409impl MockHummockMetaClient {
410 pub fn hummock_manager_ref(&self) -> Arc<HummockManager> {
411 self.hummock_manager.clone()
412 }
413}
414
415pub struct CompactionEventItemStream {
416 inner: UnboundedReceiverStream<CompactionEventItem>,
417 _handle: Vec<JoinHandle<()>>,
418}
419
420impl Drop for CompactionEventItemStream {
421 fn drop(&mut self) {
422 self.inner.close();
423 }
424}
425
426impl Stream for CompactionEventItemStream {
427 type Item = CompactionEventItem;
428
429 fn poll_next(
430 mut self: std::pin::Pin<&mut Self>,
431 cx: &mut std::task::Context<'_>,
432 ) -> std::task::Poll<Option<Self::Item>> {
433 self.inner.poll_next_unpin(cx)
434 }
435}