1use std::cmp;
16use std::collections::HashSet;
17use std::ops::DerefMut;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::time::{Duration, Instant, SystemTime};
20
21use chrono::DateTime;
22use futures::future::try_join_all;
23use futures::{StreamExt, TryStreamExt, future};
24use itertools::Itertools;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::{
28 HummockObjectId, HummockRawObjectId, VALID_OBJECT_ID_SUFFIXES, get_object_data_path,
29 get_object_id_from_path,
30};
31use risingwave_meta_model::hummock_sequence::HUMMOCK_NOW;
32use risingwave_meta_model::{hummock_gc_history, hummock_sequence, hummock_version_delta};
33use risingwave_meta_model_migration::OnConflict;
34use risingwave_object_store::object::{ObjectMetadataIter, ObjectStoreRef};
35use risingwave_pb::stream_service::GetMinUncommittedObjectIdRequest;
36use risingwave_rpc_client::StreamClientPool;
37use sea_orm::{ActiveValue, ColumnTrait, EntityTrait, QueryFilter, Set};
38
39use crate::MetaResult;
40use crate::backup_restore::BackupManagerRef;
41use crate::hummock::HummockManager;
42use crate::hummock::error::{Error, Result};
43use crate::manager::MetadataManager;
44
45pub(crate) struct GcManager {
46 store: ObjectStoreRef,
47 path_prefix: String,
48 use_new_object_prefix_strategy: bool,
49 may_delete_object_ids: parking_lot::Mutex<HashSet<HummockObjectId>>,
51}
52
53impl GcManager {
54 pub fn new(
55 store: ObjectStoreRef,
56 path_prefix: &str,
57 use_new_object_prefix_strategy: bool,
58 ) -> Self {
59 Self {
60 store,
61 path_prefix: path_prefix.to_owned(),
62 use_new_object_prefix_strategy,
63 may_delete_object_ids: Default::default(),
64 }
65 }
66
67 pub async fn delete_objects(
69 &self,
70 object_id_list: impl Iterator<Item = HummockObjectId>,
71 ) -> Result<()> {
72 let mut paths = Vec::with_capacity(1000);
73 for object_id in object_id_list {
74 let obj_prefix = self.store.get_object_prefix(
75 object_id.as_raw().inner(),
76 self.use_new_object_prefix_strategy,
77 );
78 paths.push(get_object_data_path(
79 &obj_prefix,
80 &self.path_prefix,
81 object_id,
82 ));
83 }
84 self.store.delete_objects(&paths).await?;
85 Ok(())
86 }
87
88 async fn list_object_metadata_from_object_store(
89 &self,
90 prefix: Option<String>,
91 start_after: Option<String>,
92 limit: Option<usize>,
93 ) -> Result<ObjectMetadataIter> {
94 let list_path = format!("{}/{}", self.path_prefix, prefix.unwrap_or("".into()));
95 let raw_iter = self.store.list(&list_path, start_after, limit).await?;
96 let valid_suffixes = VALID_OBJECT_ID_SUFFIXES.map(|suffix| format!(".{}", suffix));
97 let iter = raw_iter.filter(move |r| match r {
98 Ok(i) => future::ready(valid_suffixes.iter().any(|suffix| i.key.ends_with(suffix))),
99 Err(_) => future::ready(true),
100 });
101 Ok(Box::pin(iter))
102 }
103
104 pub async fn list_objects(
106 &self,
107 object_retention_watermark: u64,
108 prefix: Option<String>,
109 start_after: Option<String>,
110 limit: Option<u64>,
111 ) -> Result<(HashSet<HummockObjectId>, u64, u64, Option<String>)> {
112 tracing::debug!(
113 object_retention_watermark,
114 prefix,
115 start_after,
116 limit,
117 "Try to list objects."
118 );
119 let mut total_object_count = 0;
120 let mut total_object_size = 0;
121 let mut next_start_after: Option<String> = None;
122 let metadata_iter = self
123 .list_object_metadata_from_object_store(prefix, start_after, limit.map(|i| i as usize))
124 .await?;
125 let filtered = metadata_iter
126 .filter_map(|r| {
127 let result = match r {
128 Ok(o) => {
129 total_object_count += 1;
130 total_object_size += o.total_size;
131 if let Some(limit) = limit
134 && limit == total_object_count
135 {
136 next_start_after = Some(o.key.clone());
137 tracing::debug!(next_start_after, "set next start after");
138 }
139 if o.last_modified < object_retention_watermark as f64 {
140 Some(Ok(get_object_id_from_path(&o.key)))
141 } else {
142 None
143 }
144 }
145 Err(e) => Some(Err(Error::ObjectStore(e))),
146 };
147 async move { result }
148 })
149 .try_collect::<HashSet<HummockObjectId>>()
150 .await?;
151 Ok((
152 filtered,
153 total_object_count,
154 total_object_size as u64,
155 next_start_after,
156 ))
157 }
158
159 pub fn add_may_delete_object_ids(
160 &self,
161 may_delete_object_ids: impl Iterator<Item = HummockObjectId>,
162 ) {
163 self.may_delete_object_ids
164 .lock()
165 .extend(may_delete_object_ids);
166 }
167
168 pub fn try_take_may_delete_object_ids(
170 &self,
171 least_count: usize,
172 ) -> Option<HashSet<HummockObjectId>> {
173 let mut guard = self.may_delete_object_ids.lock();
174 if guard.len() < least_count {
175 None
176 } else {
177 Some(std::mem::take(&mut *guard))
178 }
179 }
180}
181
182impl HummockManager {
183 pub async fn delete_version_deltas(&self) -> Result<usize> {
187 let mut versioning_guard = self.versioning.write().await;
188 let versioning = versioning_guard.deref_mut();
189 let context_info = self.context_info.read().await;
190 if !context_info.version_safe_points.is_empty() {
193 return Ok(0);
194 }
195 let version_id = versioning.checkpoint.version.id;
198 let res = hummock_version_delta::Entity::delete_many()
199 .filter(hummock_version_delta::Column::Id.lte(version_id.to_u64()))
200 .exec(&self.env.meta_store_ref().conn)
201 .await?;
202 tracing::debug!(rows_affected = res.rows_affected, "Deleted version deltas");
203 versioning
204 .hummock_version_deltas
205 .retain(|id, _| *id > version_id);
206 #[cfg(test)]
207 {
208 drop(context_info);
209 drop(versioning_guard);
210 self.check_state_consistency().await;
211 }
212 Ok(res.rows_affected as usize)
213 }
214
215 pub async fn finalize_objects_to_delete(
217 &self,
218 object_ids: impl Iterator<Item = HummockObjectId>,
219 ) -> Result<Vec<HummockObjectId>> {
220 let versioning = self.versioning.read().await;
222 let tracked_object_ids: HashSet<HummockObjectId> = versioning
223 .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id());
224 let to_delete = object_ids
225 .filter(|object_id| !tracked_object_ids.contains(object_id))
226 .collect_vec();
227 self.write_gc_history(to_delete.iter().copied()).await?;
228 Ok(to_delete)
229 }
230
231 pub async fn start_full_gc(
234 &self,
235 object_retention_time: Duration,
236 prefix: Option<String>,
237 backup_manager: Option<BackupManagerRef>,
238 ) -> Result<()> {
239 if !self.full_gc_state.try_start() {
240 return Err(anyhow::anyhow!("failed to start GC due to an ongoing process").into());
241 }
242 let _guard = scopeguard::guard(self.full_gc_state.clone(), |full_gc_state| {
243 full_gc_state.stop()
244 });
245 self.metrics.full_gc_trigger_count.inc();
246 let object_retention_time = cmp::max(
247 object_retention_time,
248 Duration::from_secs(self.env.opts.min_sst_retention_time_sec),
249 );
250 let limit = self.env.opts.full_gc_object_limit;
251 let mut start_after = None;
252 let object_retention_watermark = self
253 .now()
254 .await?
255 .saturating_sub(object_retention_time.as_secs());
256 let mut total_object_count = 0;
257 let mut total_object_size = 0;
258 tracing::info!(
259 retention_sec = object_retention_time.as_secs(),
260 prefix,
261 limit,
262 "Start GC."
263 );
264 loop {
265 tracing::debug!(
266 retention_sec = object_retention_time.as_secs(),
267 prefix,
268 start_after,
269 limit,
270 "Start a GC batch."
271 );
272 let (object_ids, batch_object_count, batch_object_size, next_start_after) = self
273 .gc_manager
274 .list_objects(
275 object_retention_watermark,
276 prefix.clone(),
277 start_after.clone(),
278 Some(limit),
279 )
280 .await?;
281 total_object_count += batch_object_count;
282 total_object_size += batch_object_size;
283 tracing::debug!(
284 ?object_ids,
285 batch_object_count,
286 batch_object_size,
287 "Finish listing a GC batch."
288 );
289 self.complete_gc_batch(object_ids, backup_manager.clone())
290 .await?;
291 if next_start_after.is_none() {
292 break;
293 }
294 start_after = next_start_after;
295 }
296 tracing::info!(total_object_count, total_object_size, "Finish GC");
297 self.metrics.total_object_size.set(total_object_size as _);
298 self.metrics.total_object_count.set(total_object_count as _);
299 match self.time_travel_pinned_object_count().await {
300 Ok(count) => {
301 self.metrics.time_travel_object_count.set(count as _);
302 }
303 Err(err) => {
304 use thiserror_ext::AsReport;
305 tracing::warn!(error = %err.as_report(), "Failed to count time travel objects.");
306 }
307 }
308 Ok(())
309 }
310
311 pub(crate) async fn complete_gc_batch(
314 &self,
315 object_ids: HashSet<HummockObjectId>,
316 backup_manager: Option<BackupManagerRef>,
317 ) -> Result<usize> {
318 if object_ids.is_empty() {
319 return Ok(0);
320 }
321 let pinned_by_metadata_backup = backup_manager
323 .as_ref()
324 .map(|b| b.list_pinned_object_ids())
325 .unwrap_or_default();
326 let min_object_id = collect_min_uncommitted_object_id(
330 &self.metadata_manager,
331 self.env.stream_client_pool(),
332 )
333 .await?;
334 let metrics = &self.metrics;
335 let candidate_object_number = object_ids.len();
336 metrics
337 .full_gc_candidate_object_count
338 .observe(candidate_object_number as _);
339 let object_ids = object_ids
341 .into_iter()
342 .filter(|s| !pinned_by_metadata_backup.contains(s))
343 .collect_vec();
344 let after_metadata_backup = object_ids.len();
345 let filter_by_time_travel_start_time = Instant::now();
347 let object_ids = self
348 .filter_out_objects_by_time_travel(object_ids.into_iter())
349 .await?;
350 tracing::info!(elapsed = ?filter_by_time_travel_start_time.elapsed(), "filter out objects by time travel in full GC");
351 let after_time_travel = object_ids.len();
352 let object_ids = object_ids
354 .into_iter()
355 .filter(|id| id.as_raw() < min_object_id)
356 .collect_vec();
357 let after_min_object_id = object_ids.len();
358 let after_version = self
360 .finalize_objects_to_delete(object_ids.into_iter())
361 .await?;
362 let after_version_count = after_version.len();
363 metrics
364 .full_gc_selected_object_count
365 .observe(after_version_count as _);
366 tracing::info!(
367 candidate_object_number,
368 after_metadata_backup,
369 after_time_travel,
370 after_min_object_id,
371 after_version_count,
372 "complete gc batch"
373 );
374 self.delete_objects(after_version).await?;
375 Ok(after_version_count)
376 }
377
378 pub async fn now(&self) -> Result<u64> {
379 let mut guard = self.now.lock().await;
380 let new_now = SystemTime::now()
381 .duration_since(SystemTime::UNIX_EPOCH)
382 .expect("Clock may have gone backwards")
383 .as_secs();
384 if new_now < *guard {
385 return Err(anyhow::anyhow!(format!(
386 "unexpected decreasing now, old={}, new={}",
387 *guard, new_now
388 ))
389 .into());
390 }
391 *guard = new_now;
392 drop(guard);
393 let m = hummock_sequence::ActiveModel {
395 name: ActiveValue::Set(HUMMOCK_NOW.into()),
396 seq: ActiveValue::Set(new_now.try_into().unwrap()),
397 };
398 hummock_sequence::Entity::insert(m)
399 .on_conflict(
400 OnConflict::column(hummock_sequence::Column::Name)
401 .update_column(hummock_sequence::Column::Seq)
402 .to_owned(),
403 )
404 .exec(&self.env.meta_store_ref().conn)
405 .await?;
406 Ok(new_now)
407 }
408
409 pub(crate) async fn load_now(&self) -> Result<Option<u64>> {
410 let now = hummock_sequence::Entity::find_by_id(HUMMOCK_NOW.to_owned())
411 .one(&self.env.meta_store_ref().conn)
412 .await?
413 .map(|m| m.seq.try_into().unwrap());
414 Ok(now)
415 }
416
417 async fn write_gc_history(
418 &self,
419 object_ids: impl Iterator<Item = HummockObjectId>,
420 ) -> Result<()> {
421 if self.env.opts.gc_history_retention_time_sec == 0 {
422 return Ok(());
423 }
424 let now = self.now().await?;
425 let dt = DateTime::from_timestamp(now.try_into().unwrap(), 0).unwrap();
426 let mut models = object_ids.map(|o| hummock_gc_history::ActiveModel {
427 object_id: Set(o.as_raw().inner().try_into().unwrap()),
428 mark_delete_at: Set(dt.naive_utc()),
429 });
430 let db = &self.meta_store_ref().conn;
431 let gc_history_low_watermark = DateTime::from_timestamp(
432 now.saturating_sub(self.env.opts.gc_history_retention_time_sec)
433 .try_into()
434 .unwrap(),
435 0,
436 )
437 .unwrap();
438 hummock_gc_history::Entity::delete_many()
439 .filter(hummock_gc_history::Column::MarkDeleteAt.lt(gc_history_low_watermark))
440 .exec(db)
441 .await?;
442 let mut is_finished = false;
443 while !is_finished {
444 let mut batch = vec![];
445 let mut count: usize = self.env.opts.hummock_gc_history_insert_batch_size;
446 while count > 0 {
447 let Some(m) = models.next() else {
448 is_finished = true;
449 break;
450 };
451 count -= 1;
452 batch.push(m);
453 }
454 if batch.is_empty() {
455 break;
456 }
457 hummock_gc_history::Entity::insert_many(batch)
458 .on_conflict_do_nothing()
459 .exec(db)
460 .await?;
461 }
462 Ok(())
463 }
464
465 pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> {
466 let current_epoch_time = Epoch::now().physical_time();
467 let epoch_watermark = Epoch::from_physical_time(
468 current_epoch_time.saturating_sub(
469 self.env
470 .system_params_reader()
471 .await
472 .time_travel_retention_ms(),
473 ),
474 )
475 .0;
476 self.truncate_time_travel_metadata(epoch_watermark).await?;
477 Ok(())
478 }
479
480 pub async fn delete_objects(
484 &self,
485 mut objects_to_delete: Vec<HummockObjectId>,
486 ) -> Result<usize> {
487 let total = objects_to_delete.len();
488 let mut batch_size = 1000usize;
489 while !objects_to_delete.is_empty() {
490 if self.env.opts.vacuum_spin_interval_ms != 0 {
491 tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms))
492 .await;
493 }
494 batch_size = cmp::min(objects_to_delete.len(), batch_size);
495 if batch_size == 0 {
496 break;
497 }
498 let delete_batch: HashSet<_> = objects_to_delete.drain(..batch_size).collect();
499 tracing::info!(?delete_batch, "Attempt to delete objects.");
500 let deleted_object_ids = delete_batch.clone();
501 self.gc_manager
502 .delete_objects(delete_batch.into_iter())
503 .await?;
504 tracing::debug!(?deleted_object_ids, "Finish deleting objects.");
505 }
506 Ok(total)
507 }
508
509 pub async fn try_start_minor_gc(&self, backup_manager: BackupManagerRef) -> Result<()> {
511 const MIN_MINOR_GC_OBJECT_COUNT: usize = 1000;
512 let Some(object_ids) = self
513 .gc_manager
514 .try_take_may_delete_object_ids(MIN_MINOR_GC_OBJECT_COUNT)
515 else {
516 return Ok(());
517 };
518 let backup_pinned: HashSet<_> = backup_manager.list_pinned_object_ids();
520 let version_pinned = {
522 let versioning = self.versioning.read().await;
523 versioning
524 .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id())
525 };
526 let object_ids = object_ids
527 .into_iter()
528 .filter(|s| !version_pinned.contains(s) && !backup_pinned.contains(s));
529 let filter_by_time_travel_start_time = Instant::now();
530 let object_ids = self.filter_out_objects_by_time_travel(object_ids).await?;
531 tracing::info!(elapsed = ?filter_by_time_travel_start_time.elapsed(), "filter out objects by time travel in minor GC");
532 self.delete_objects(object_ids.into_iter().collect())
534 .await?;
535 Ok(())
536 }
537}
538
539async fn collect_min_uncommitted_object_id(
540 metadata_manager: &MetadataManager,
541 client_pool: &StreamClientPool,
542) -> Result<HummockRawObjectId> {
543 let futures = metadata_manager
544 .list_active_streaming_compute_nodes()
545 .await
546 .map_err(|err| Error::MetaStore(err.into()))?
547 .into_iter()
548 .map(|worker_node| async move {
549 let client = client_pool.get(&worker_node).await?;
550 let request = GetMinUncommittedObjectIdRequest {};
551 client.get_min_uncommitted_object_id(request).await
552 });
553 let min_watermark = try_join_all(futures)
554 .await
555 .map_err(|err| Error::Internal(err.into()))?
556 .into_iter()
557 .map(|resp| resp.min_uncommitted_object_id)
558 .min()
559 .unwrap_or(u64::MAX);
560 Ok(min_watermark.into())
561}
562
563pub struct FullGcState {
564 is_started: AtomicBool,
565}
566
567impl FullGcState {
568 pub fn new() -> Self {
569 Self {
570 is_started: AtomicBool::new(false),
571 }
572 }
573
574 pub fn try_start(&self) -> bool {
575 self.is_started
576 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
577 .is_ok()
578 }
579
580 pub fn stop(&self) {
581 self.is_started.store(false, Ordering::SeqCst);
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use std::sync::Arc;
588 use std::time::Duration;
589
590 use itertools::Itertools;
591 use risingwave_hummock_sdk::HummockObjectId;
592 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
593 use risingwave_rpc_client::HummockMetaClient;
594
595 use crate::hummock::MockHummockMetaClient;
596 use crate::hummock::test_utils::{add_test_tables, setup_compute_env};
597
598 #[tokio::test]
599 async fn test_full_gc() {
600 let (_env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
601 let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(MockHummockMetaClient::new(
602 hummock_manager.clone(),
603 worker_id as _,
604 ));
605 let compaction_group_id = StaticCompactionGroupId::StateDefault.into();
606 hummock_manager
607 .start_full_gc(
608 Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1),
609 None,
610 None,
611 )
612 .await
613 .unwrap();
614
615 hummock_manager
617 .complete_gc_batch(vec![].into_iter().collect(), None)
618 .await
619 .unwrap();
620
621 assert_eq!(
624 3,
625 hummock_manager
626 .complete_gc_batch(
627 [i64::MAX as u64 - 2, i64::MAX as u64 - 1, i64::MAX as u64]
628 .into_iter()
629 .map(|id| HummockObjectId::Sstable(id.into()))
630 .collect(),
631 None,
632 )
633 .await
634 .unwrap()
635 );
636
637 let sst_infos = add_test_tables(
639 hummock_manager.as_ref(),
640 hummock_meta_client.clone(),
641 compaction_group_id,
642 )
643 .await;
644 let committed_object_ids = sst_infos
645 .into_iter()
646 .flatten()
647 .map(|s| s.object_id)
648 .sorted()
649 .collect_vec();
650 assert!(!committed_object_ids.is_empty());
651 let max_committed_object_id = *committed_object_ids.iter().max().unwrap();
652 assert_eq!(
653 1,
654 hummock_manager
655 .complete_gc_batch(
656 [committed_object_ids, vec![max_committed_object_id + 1]]
657 .concat()
658 .into_iter()
659 .map(HummockObjectId::Sstable)
660 .collect(),
661 None,
662 )
663 .await
664 .unwrap()
665 );
666 }
667}