1use std::cmp;
16use std::collections::HashSet;
17use std::ops::DerefMut;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::time::{Duration, SystemTime};
20
21use chrono::DateTime;
22use futures::future::try_join_all;
23use futures::{StreamExt, TryStreamExt, future};
24use itertools::Itertools;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::{
28 HummockSstableObjectId, OBJECT_SUFFIX, get_object_id_from_path, get_sst_data_path,
29};
30use risingwave_meta_model::hummock_sequence::HUMMOCK_NOW;
31use risingwave_meta_model::{hummock_gc_history, hummock_sequence, hummock_version_delta};
32use risingwave_meta_model_migration::OnConflict;
33use risingwave_object_store::object::{ObjectMetadataIter, ObjectStoreRef};
34use risingwave_pb::stream_service::GetMinUncommittedSstIdRequest;
35use risingwave_rpc_client::StreamClientPool;
36use sea_orm::{ActiveValue, ColumnTrait, EntityTrait, QueryFilter, Set};
37
38use crate::MetaResult;
39use crate::backup_restore::BackupManagerRef;
40use crate::hummock::HummockManager;
41use crate::hummock::error::{Error, Result};
42use crate::manager::MetadataManager;
43
44pub(crate) struct GcManager {
45 store: ObjectStoreRef,
46 path_prefix: String,
47 use_new_object_prefix_strategy: bool,
48 may_delete_object_ids: parking_lot::Mutex<HashSet<HummockSstableObjectId>>,
50}
51
52impl GcManager {
53 pub fn new(
54 store: ObjectStoreRef,
55 path_prefix: &str,
56 use_new_object_prefix_strategy: bool,
57 ) -> Self {
58 Self {
59 store,
60 path_prefix: path_prefix.to_owned(),
61 use_new_object_prefix_strategy,
62 may_delete_object_ids: Default::default(),
63 }
64 }
65
66 pub async fn delete_objects(
68 &self,
69 object_id_list: impl Iterator<Item = HummockSstableObjectId>,
70 ) -> Result<()> {
71 let mut paths = Vec::with_capacity(1000);
72 for object_id in object_id_list {
73 let obj_prefix = self
74 .store
75 .get_object_prefix(object_id, self.use_new_object_prefix_strategy);
76 paths.push(get_sst_data_path(&obj_prefix, &self.path_prefix, object_id));
77 }
78 self.store.delete_objects(&paths).await?;
79 Ok(())
80 }
81
82 async fn list_object_metadata_from_object_store(
83 &self,
84 prefix: Option<String>,
85 start_after: Option<String>,
86 limit: Option<usize>,
87 ) -> Result<ObjectMetadataIter> {
88 let list_path = format!("{}/{}", self.path_prefix, prefix.unwrap_or("".into()));
89 let raw_iter = self.store.list(&list_path, start_after, limit).await?;
90 let iter = raw_iter.filter(|r| match r {
91 Ok(i) => future::ready(i.key.ends_with(&format!(".{}", OBJECT_SUFFIX))),
92 Err(_) => future::ready(true),
93 });
94 Ok(Box::pin(iter))
95 }
96
97 pub async fn list_objects(
99 &self,
100 sst_retention_watermark: u64,
101 prefix: Option<String>,
102 start_after: Option<String>,
103 limit: Option<u64>,
104 ) -> Result<(HashSet<HummockSstableObjectId>, u64, u64, Option<String>)> {
105 tracing::debug!(
106 sst_retention_watermark,
107 prefix,
108 start_after,
109 limit,
110 "Try to list objects."
111 );
112 let mut total_object_count = 0;
113 let mut total_object_size = 0;
114 let mut next_start_after: Option<String> = None;
115 let metadata_iter = self
116 .list_object_metadata_from_object_store(prefix, start_after, limit.map(|i| i as usize))
117 .await?;
118 let filtered = metadata_iter
119 .filter_map(|r| {
120 let result = match r {
121 Ok(o) => {
122 total_object_count += 1;
123 total_object_size += o.total_size;
124 if let Some(limit) = limit
127 && limit == total_object_count
128 {
129 next_start_after = Some(o.key.clone());
130 tracing::debug!(next_start_after, "set next start after");
131 }
132 if o.last_modified < sst_retention_watermark as f64 {
133 Some(Ok(get_object_id_from_path(&o.key)))
134 } else {
135 None
136 }
137 }
138 Err(e) => Some(Err(Error::ObjectStore(e))),
139 };
140 async move { result }
141 })
142 .try_collect::<HashSet<HummockSstableObjectId>>()
143 .await?;
144 Ok((
145 filtered,
146 total_object_count,
147 total_object_size as u64,
148 next_start_after,
149 ))
150 }
151
152 pub fn add_may_delete_object_ids(
153 &self,
154 may_delete_object_ids: impl Iterator<Item = HummockSstableObjectId>,
155 ) {
156 self.may_delete_object_ids
157 .lock()
158 .extend(may_delete_object_ids);
159 }
160
161 pub fn try_take_may_delete_object_ids(
163 &self,
164 least_count: usize,
165 ) -> Option<HashSet<HummockSstableObjectId>> {
166 let mut guard = self.may_delete_object_ids.lock();
167 if guard.len() < least_count {
168 None
169 } else {
170 Some(std::mem::take(&mut *guard))
171 }
172 }
173}
174
175impl HummockManager {
176 pub async fn delete_version_deltas(&self) -> Result<usize> {
180 let mut versioning_guard = self.versioning.write().await;
181 let versioning = versioning_guard.deref_mut();
182 let context_info = self.context_info.read().await;
183 if !context_info.version_safe_points.is_empty() {
186 return Ok(0);
187 }
188 let version_id = versioning.checkpoint.version.id;
191 let res = hummock_version_delta::Entity::delete_many()
192 .filter(hummock_version_delta::Column::Id.lte(version_id.to_u64()))
193 .exec(&self.env.meta_store_ref().conn)
194 .await?;
195 tracing::debug!(rows_affected = res.rows_affected, "Deleted version deltas");
196 versioning
197 .hummock_version_deltas
198 .retain(|id, _| *id > version_id);
199 #[cfg(test)]
200 {
201 drop(context_info);
202 drop(versioning_guard);
203 self.check_state_consistency().await;
204 }
205 Ok(res.rows_affected as usize)
206 }
207
208 pub async fn finalize_objects_to_delete(
210 &self,
211 object_ids: impl Iterator<Item = HummockSstableObjectId> + Clone,
212 ) -> Result<Vec<HummockSstableObjectId>> {
213 let versioning = self.versioning.read().await;
215 let tracked_object_ids: HashSet<HummockSstableObjectId> = versioning
216 .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id());
217 let to_delete = object_ids.filter(|object_id| !tracked_object_ids.contains(object_id));
218 self.write_gc_history(to_delete.clone()).await?;
219 Ok(to_delete.collect())
220 }
221
222 pub async fn start_full_gc(
225 &self,
226 sst_retention_time: Duration,
227 prefix: Option<String>,
228 backup_manager: Option<BackupManagerRef>,
229 ) -> Result<()> {
230 if !self.full_gc_state.try_start() {
231 return Err(anyhow::anyhow!("failed to start GC due to an ongoing process").into());
232 }
233 let _guard = scopeguard::guard(self.full_gc_state.clone(), |full_gc_state| {
234 full_gc_state.stop()
235 });
236 self.metrics.full_gc_trigger_count.inc();
237 let sst_retention_time = cmp::max(
238 sst_retention_time,
239 Duration::from_secs(self.env.opts.min_sst_retention_time_sec),
240 );
241 let limit = self.env.opts.full_gc_object_limit;
242 let mut start_after = None;
243 let sst_retention_watermark = self
244 .now()
245 .await?
246 .saturating_sub(sst_retention_time.as_secs());
247 let mut total_object_count = 0;
248 let mut total_object_size = 0;
249 tracing::info!(
250 retention_sec = sst_retention_time.as_secs(),
251 prefix,
252 limit,
253 "Start GC."
254 );
255 loop {
256 tracing::debug!(
257 retention_sec = sst_retention_time.as_secs(),
258 prefix,
259 start_after,
260 limit,
261 "Start a GC batch."
262 );
263 let (object_ids, batch_object_count, batch_object_size, next_start_after) = self
264 .gc_manager
265 .list_objects(
266 sst_retention_watermark,
267 prefix.clone(),
268 start_after.clone(),
269 Some(limit),
270 )
271 .await?;
272 total_object_count += batch_object_count;
273 total_object_size += batch_object_size;
274 tracing::debug!(
275 ?object_ids,
276 batch_object_count,
277 batch_object_size,
278 "Finish listing a GC batch."
279 );
280 self.complete_gc_batch(object_ids, backup_manager.clone())
281 .await?;
282 if next_start_after.is_none() {
283 break;
284 }
285 start_after = next_start_after;
286 }
287 tracing::info!(total_object_count, total_object_size, "Finish GC");
288 self.metrics.total_object_size.set(total_object_size as _);
289 self.metrics.total_object_count.set(total_object_count as _);
290 match self.time_travel_pinned_object_count().await {
291 Ok(count) => {
292 self.metrics.time_travel_object_count.set(count as _);
293 }
294 Err(err) => {
295 use thiserror_ext::AsReport;
296 tracing::warn!(error = %err.as_report(), "Failed to count time travel objects.");
297 }
298 }
299 Ok(())
300 }
301
302 pub(crate) async fn complete_gc_batch(
305 &self,
306 object_ids: HashSet<HummockSstableObjectId>,
307 backup_manager: Option<BackupManagerRef>,
308 ) -> Result<usize> {
309 if object_ids.is_empty() {
310 return Ok(0);
311 }
312 let pinned_by_metadata_backup = backup_manager
314 .as_ref()
315 .map(|b| b.list_pinned_ssts())
316 .unwrap_or_default();
317 let min_sst_id =
321 collect_min_uncommitted_sst_id(&self.metadata_manager, self.env.stream_client_pool())
322 .await?;
323 let metrics = &self.metrics;
324 let candidate_object_number = object_ids.len();
325 metrics
326 .full_gc_candidate_object_count
327 .observe(candidate_object_number as _);
328 let object_ids = object_ids
330 .into_iter()
331 .filter(|s| !pinned_by_metadata_backup.contains(s))
332 .collect_vec();
333 let after_metadata_backup = object_ids.len();
334 let object_ids = self
336 .filter_out_objects_by_time_travel(
337 object_ids.into_iter(),
338 self.env
339 .opts
340 .hummock_time_travel_filter_out_objects_batch_size,
341 )
342 .await?;
343 let after_time_travel = object_ids.len();
344 let object_ids = object_ids
346 .into_iter()
347 .filter(|id| *id < min_sst_id)
348 .collect_vec();
349 let after_min_sst_id = object_ids.len();
350 let after_version = self
352 .finalize_objects_to_delete(object_ids.into_iter())
353 .await?;
354 let after_version_count = after_version.len();
355 metrics
356 .full_gc_selected_object_count
357 .observe(after_version_count as _);
358 tracing::info!(
359 candidate_object_number,
360 after_metadata_backup,
361 after_time_travel,
362 after_min_sst_id,
363 after_version_count,
364 "complete gc batch"
365 );
366 self.delete_objects(after_version).await?;
367 Ok(after_version_count)
368 }
369
370 pub async fn now(&self) -> Result<u64> {
371 let mut guard = self.now.lock().await;
372 let new_now = SystemTime::now()
373 .duration_since(SystemTime::UNIX_EPOCH)
374 .expect("Clock may have gone backwards")
375 .as_secs();
376 if new_now < *guard {
377 return Err(anyhow::anyhow!(format!(
378 "unexpected decreasing now, old={}, new={}",
379 *guard, new_now
380 ))
381 .into());
382 }
383 *guard = new_now;
384 drop(guard);
385 let m = hummock_sequence::ActiveModel {
387 name: ActiveValue::Set(HUMMOCK_NOW.into()),
388 seq: ActiveValue::Set(new_now.try_into().unwrap()),
389 };
390 hummock_sequence::Entity::insert(m)
391 .on_conflict(
392 OnConflict::column(hummock_sequence::Column::Name)
393 .update_column(hummock_sequence::Column::Seq)
394 .to_owned(),
395 )
396 .exec(&self.env.meta_store_ref().conn)
397 .await?;
398 Ok(new_now)
399 }
400
401 pub(crate) async fn load_now(&self) -> Result<Option<u64>> {
402 let now = hummock_sequence::Entity::find_by_id(HUMMOCK_NOW.to_owned())
403 .one(&self.env.meta_store_ref().conn)
404 .await?
405 .map(|m| m.seq.try_into().unwrap());
406 Ok(now)
407 }
408
409 async fn write_gc_history(
410 &self,
411 object_ids: impl Iterator<Item = HummockSstableObjectId>,
412 ) -> Result<()> {
413 if self.env.opts.gc_history_retention_time_sec == 0 {
414 return Ok(());
415 }
416 let now = self.now().await?;
417 let dt = DateTime::from_timestamp(now.try_into().unwrap(), 0).unwrap();
418 let mut models = object_ids.map(|o| hummock_gc_history::ActiveModel {
419 object_id: Set(o.try_into().unwrap()),
420 mark_delete_at: Set(dt.naive_utc()),
421 });
422 let db = &self.meta_store_ref().conn;
423 let gc_history_low_watermark = DateTime::from_timestamp(
424 now.saturating_sub(self.env.opts.gc_history_retention_time_sec)
425 .try_into()
426 .unwrap(),
427 0,
428 )
429 .unwrap();
430 hummock_gc_history::Entity::delete_many()
431 .filter(hummock_gc_history::Column::MarkDeleteAt.lt(gc_history_low_watermark))
432 .exec(db)
433 .await?;
434 let mut is_finished = false;
435 while !is_finished {
436 let mut batch = vec![];
437 let mut count: usize = self.env.opts.hummock_gc_history_insert_batch_size;
438 while count > 0 {
439 let Some(m) = models.next() else {
440 is_finished = true;
441 break;
442 };
443 count -= 1;
444 batch.push(m);
445 }
446 if batch.is_empty() {
447 break;
448 }
449 hummock_gc_history::Entity::insert_many(batch)
450 .on_conflict_do_nothing()
451 .exec(db)
452 .await?;
453 }
454 Ok(())
455 }
456
457 pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> {
458 let current_epoch_time = Epoch::now().physical_time();
459 let epoch_watermark = Epoch::from_physical_time(
460 current_epoch_time.saturating_sub(
461 self.env
462 .system_params_reader()
463 .await
464 .time_travel_retention_ms(),
465 ),
466 )
467 .0;
468 self.truncate_time_travel_metadata(epoch_watermark).await?;
469 Ok(())
470 }
471
472 pub async fn delete_objects(
476 &self,
477 mut objects_to_delete: Vec<HummockSstableObjectId>,
478 ) -> Result<usize> {
479 let total = objects_to_delete.len();
480 let mut batch_size = 1000usize;
481 while !objects_to_delete.is_empty() {
482 if self.env.opts.vacuum_spin_interval_ms != 0 {
483 tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms))
484 .await;
485 }
486 batch_size = cmp::min(objects_to_delete.len(), batch_size);
487 if batch_size == 0 {
488 break;
489 }
490 let delete_batch: HashSet<_> = objects_to_delete.drain(..batch_size).collect();
491 tracing::info!(?delete_batch, "Attempt to delete objects.");
492 let deleted_object_ids = delete_batch.clone();
493 self.gc_manager
494 .delete_objects(delete_batch.into_iter())
495 .await?;
496 tracing::debug!(?deleted_object_ids, "Finish deleting objects.");
497 }
498 Ok(total)
499 }
500
501 pub async fn try_start_minor_gc(&self, backup_manager: BackupManagerRef) -> Result<()> {
503 const MIN_MINOR_GC_OBJECT_COUNT: usize = 1000;
504 let Some(object_ids) = self
505 .gc_manager
506 .try_take_may_delete_object_ids(MIN_MINOR_GC_OBJECT_COUNT)
507 else {
508 return Ok(());
509 };
510 let backup_pinned: HashSet<_> = backup_manager.list_pinned_ssts();
512 let version_pinned = {
514 let versioning = self.versioning.read().await;
515 versioning
516 .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id())
517 };
518 let object_ids = object_ids
519 .into_iter()
520 .filter(|s| !version_pinned.contains(s) && !backup_pinned.contains(s));
521 let object_ids = self
522 .filter_out_objects_by_time_travel(
523 object_ids,
524 self.env
525 .opts
526 .hummock_time_travel_filter_out_objects_batch_size,
527 )
528 .await?;
529 self.delete_objects(object_ids.into_iter().collect())
531 .await?;
532 Ok(())
533 }
534}
535
536async fn collect_min_uncommitted_sst_id(
537 metadata_manager: &MetadataManager,
538 client_pool: &StreamClientPool,
539) -> Result<HummockSstableObjectId> {
540 let futures = metadata_manager
541 .list_active_streaming_compute_nodes()
542 .await
543 .map_err(|err| Error::MetaStore(err.into()))?
544 .into_iter()
545 .map(|worker_node| async move {
546 let client = client_pool.get(&worker_node).await?;
547 let request = GetMinUncommittedSstIdRequest {};
548 client.get_min_uncommitted_sst_id(request).await
549 });
550 let min_watermark = try_join_all(futures)
551 .await
552 .map_err(|err| Error::Internal(err.into()))?
553 .into_iter()
554 .map(|resp| resp.min_uncommitted_sst_id)
555 .min()
556 .unwrap_or(HummockSstableObjectId::MAX);
557 Ok(min_watermark)
558}
559
560pub struct FullGcState {
561 is_started: AtomicBool,
562}
563
564impl FullGcState {
565 pub fn new() -> Self {
566 Self {
567 is_started: AtomicBool::new(false),
568 }
569 }
570
571 pub fn try_start(&self) -> bool {
572 self.is_started
573 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
574 .is_ok()
575 }
576
577 pub fn stop(&self) {
578 self.is_started.store(false, Ordering::SeqCst);
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use std::sync::Arc;
585 use std::time::Duration;
586
587 use itertools::Itertools;
588 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
589 use risingwave_rpc_client::HummockMetaClient;
590
591 use crate::hummock::MockHummockMetaClient;
592 use crate::hummock::test_utils::{add_test_tables, setup_compute_env};
593
594 #[tokio::test]
595 async fn test_full_gc() {
596 let (_env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
597 let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(MockHummockMetaClient::new(
598 hummock_manager.clone(),
599 worker_id as _,
600 ));
601 let compaction_group_id = StaticCompactionGroupId::StateDefault.into();
602 hummock_manager
603 .start_full_gc(
604 Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1),
605 None,
606 None,
607 )
608 .await
609 .unwrap();
610
611 hummock_manager
613 .complete_gc_batch(vec![].into_iter().collect(), None)
614 .await
615 .unwrap();
616
617 assert_eq!(
620 3,
621 hummock_manager
622 .complete_gc_batch(
623 vec![i64::MAX as u64 - 2, i64::MAX as u64 - 1, i64::MAX as u64]
624 .into_iter()
625 .collect(),
626 None,
627 )
628 .await
629 .unwrap()
630 );
631
632 let sst_infos = add_test_tables(
634 hummock_manager.as_ref(),
635 hummock_meta_client.clone(),
636 compaction_group_id,
637 )
638 .await;
639 let committed_object_ids = sst_infos
640 .into_iter()
641 .flatten()
642 .map(|s| s.object_id)
643 .sorted()
644 .collect_vec();
645 assert!(!committed_object_ids.is_empty());
646 let max_committed_object_id = *committed_object_ids.iter().max().unwrap();
647 assert_eq!(
648 1,
649 hummock_manager
650 .complete_gc_batch(
651 [committed_object_ids, vec![max_committed_object_id + 1]]
652 .concat()
653 .into_iter()
654 .collect(),
655 None,
656 )
657 .await
658 .unwrap()
659 );
660 }
661}