1use std::cmp;
16use std::collections::HashSet;
17use std::ops::DerefMut;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::time::{Duration, Instant, SystemTime};
20
21use chrono::DateTime;
22use futures::future::try_join_all;
23use futures::{StreamExt, TryStreamExt, future};
24use itertools::Itertools;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::{
28 HummockObjectId, HummockRawObjectId, VALID_OBJECT_ID_SUFFIXES, get_object_data_path,
29 get_object_id_from_path,
30};
31use risingwave_meta_model::hummock_sequence::HUMMOCK_NOW;
32use risingwave_meta_model::{hummock_gc_history, hummock_sequence, hummock_version_delta};
33use risingwave_meta_model_migration::OnConflict;
34use risingwave_object_store::object::{ObjectMetadataIter, ObjectStoreRef};
35use risingwave_pb::stream_service::GetMinUncommittedObjectIdRequest;
36use risingwave_rpc_client::StreamClientPool;
37use sea_orm::{ActiveValue, ColumnTrait, EntityTrait, QueryFilter, Set};
38
39use crate::MetaResult;
40use crate::backup_restore::BackupManagerRef;
41use crate::hummock::HummockManager;
42use crate::hummock::error::{Error, Result};
43use crate::manager::MetadataManager;
44
45pub(crate) struct GcManager {
46 store: ObjectStoreRef,
47 path_prefix: String,
48 use_new_object_prefix_strategy: bool,
49 may_delete_object_ids: parking_lot::Mutex<HashSet<HummockObjectId>>,
51}
52
53impl GcManager {
54 pub fn new(
55 store: ObjectStoreRef,
56 path_prefix: &str,
57 use_new_object_prefix_strategy: bool,
58 ) -> Self {
59 Self {
60 store,
61 path_prefix: path_prefix.to_owned(),
62 use_new_object_prefix_strategy,
63 may_delete_object_ids: Default::default(),
64 }
65 }
66
67 pub async fn delete_objects(
69 &self,
70 object_id_list: impl Iterator<Item = HummockObjectId>,
71 ) -> Result<()> {
72 let mut paths = Vec::with_capacity(1000);
73 for object_id in object_id_list {
74 let obj_prefix = self.store.get_object_prefix(
75 object_id.as_raw().inner(),
76 self.use_new_object_prefix_strategy,
77 );
78 paths.push(get_object_data_path(
79 &obj_prefix,
80 &self.path_prefix,
81 object_id,
82 ));
83 }
84 self.store.delete_objects(&paths).await?;
85 Ok(())
86 }
87
88 async fn list_object_metadata_from_object_store(
89 &self,
90 prefix: Option<String>,
91 start_after: Option<String>,
92 limit: Option<usize>,
93 ) -> Result<ObjectMetadataIter> {
94 let list_path = format!("{}/{}", self.path_prefix, prefix.unwrap_or("".into()));
95 let raw_iter = self.store.list(&list_path, start_after, limit).await?;
96 let valid_suffixes = VALID_OBJECT_ID_SUFFIXES.map(|suffix| format!(".{}", suffix));
97 let iter = raw_iter.filter(move |r| match r {
98 Ok(i) => future::ready(valid_suffixes.iter().any(|suffix| i.key.ends_with(suffix))),
99 Err(_) => future::ready(true),
100 });
101 Ok(Box::pin(iter))
102 }
103
104 pub async fn list_objects(
106 &self,
107 object_retention_watermark: u64,
108 prefix: Option<String>,
109 start_after: Option<String>,
110 limit: Option<u64>,
111 ) -> Result<(HashSet<HummockObjectId>, u64, u64, Option<String>)> {
112 tracing::debug!(
113 object_retention_watermark,
114 prefix,
115 start_after,
116 limit,
117 "Try to list objects."
118 );
119 let mut total_object_count = 0;
120 let mut total_object_size = 0;
121 let mut next_start_after: Option<String> = None;
122 let metadata_iter = self
123 .list_object_metadata_from_object_store(prefix, start_after, limit.map(|i| i as usize))
124 .await?;
125 let filtered = metadata_iter
126 .filter_map(|r| {
127 let result = match r {
128 Ok(o) => {
129 total_object_count += 1;
130 total_object_size += o.total_size;
131 if let Some(limit) = limit
134 && limit == total_object_count
135 {
136 next_start_after = Some(o.key.clone());
137 tracing::debug!(next_start_after, "set next start after");
138 }
139 if o.last_modified < object_retention_watermark as f64 {
140 Some(Ok(get_object_id_from_path(&o.key)))
141 } else {
142 None
143 }
144 }
145 Err(e) => Some(Err(Error::ObjectStore(e))),
146 };
147 async move { result }
148 })
149 .try_collect::<HashSet<HummockObjectId>>()
150 .await?;
151 Ok((
152 filtered,
153 total_object_count,
154 total_object_size as u64,
155 next_start_after,
156 ))
157 }
158
159 pub fn add_may_delete_object_ids(
160 &self,
161 may_delete_object_ids: impl Iterator<Item = HummockObjectId>,
162 ) {
163 self.may_delete_object_ids
164 .lock()
165 .extend(may_delete_object_ids);
166 }
167
168 pub fn try_take_may_delete_object_ids(
170 &self,
171 least_count: usize,
172 ) -> Option<HashSet<HummockObjectId>> {
173 let mut guard = self.may_delete_object_ids.lock();
174 if guard.len() < least_count {
175 None
176 } else {
177 Some(std::mem::take(&mut *guard))
178 }
179 }
180}
181
182impl HummockManager {
183 pub async fn delete_version_deltas(&self) -> Result<usize> {
187 let mut versioning_guard = self.versioning.write().await;
188 let versioning = versioning_guard.deref_mut();
189 let context_info = self.context_info.read().await;
190 if !context_info.version_safe_points.is_empty() {
193 return Ok(0);
194 }
195 let version_id = versioning.checkpoint.version.id;
198 let res = hummock_version_delta::Entity::delete_many()
199 .filter(hummock_version_delta::Column::Id.lte(version_id.to_u64()))
200 .exec(&self.env.meta_store_ref().conn)
201 .await?;
202 tracing::debug!(rows_affected = res.rows_affected, "Deleted version deltas");
203 versioning
204 .hummock_version_deltas
205 .retain(|id, _| *id > version_id);
206 #[cfg(test)]
207 {
208 drop(context_info);
209 drop(versioning_guard);
210 self.check_state_consistency().await;
211 }
212 Ok(res.rows_affected as usize)
213 }
214
215 pub async fn finalize_objects_to_delete(
217 &self,
218 object_ids: impl Iterator<Item = HummockObjectId>,
219 ) -> Result<Vec<HummockObjectId>> {
220 let versioning = self.versioning.read().await;
222 let tracked_object_ids: HashSet<HummockObjectId> = versioning
223 .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id());
224 let to_delete = object_ids
225 .filter(|object_id| !tracked_object_ids.contains(object_id))
226 .collect_vec();
227 self.write_gc_history(to_delete.iter().copied()).await?;
228 Ok(to_delete)
229 }
230
231 pub async fn start_full_gc(
234 &self,
235 object_retention_time: Duration,
236 prefix: Option<String>,
237 backup_manager: Option<BackupManagerRef>,
238 ) -> Result<()> {
239 if !self.full_gc_state.try_start() {
240 return Err(anyhow::anyhow!("failed to start GC due to an ongoing process").into());
241 }
242 let _guard = scopeguard::guard(self.full_gc_state.clone(), |full_gc_state| {
243 full_gc_state.stop()
244 });
245 self.metrics.full_gc_trigger_count.inc();
246 let object_retention_time = cmp::max(
247 object_retention_time,
248 Duration::from_secs(self.env.opts.min_sst_retention_time_sec),
249 );
250 let limit = self.env.opts.full_gc_object_limit;
251 let mut start_after = None;
252 let object_retention_watermark = self
253 .now()
254 .await?
255 .saturating_sub(object_retention_time.as_secs());
256 let mut total_object_count = 0;
257 let mut total_object_size = 0;
258 tracing::info!(
259 retention_sec = object_retention_time.as_secs(),
260 prefix,
261 limit,
262 "Start GC."
263 );
264 loop {
265 tracing::debug!(
266 retention_sec = object_retention_time.as_secs(),
267 prefix,
268 start_after,
269 limit,
270 "Start a GC batch."
271 );
272 let (object_ids, batch_object_count, batch_object_size, next_start_after) = self
273 .gc_manager
274 .list_objects(
275 object_retention_watermark,
276 prefix.clone(),
277 start_after.clone(),
278 Some(limit),
279 )
280 .await?;
281 total_object_count += batch_object_count;
282 total_object_size += batch_object_size;
283 tracing::debug!(
284 ?object_ids,
285 batch_object_count,
286 batch_object_size,
287 "Finish listing a GC batch."
288 );
289 self.complete_gc_batch(object_ids, backup_manager.clone())
290 .await?;
291 if next_start_after.is_none() {
292 break;
293 }
294 start_after = next_start_after;
295 }
296 tracing::info!(total_object_count, total_object_size, "Finish GC");
297 self.metrics.total_object_size.set(total_object_size as _);
298 self.metrics.total_object_count.set(total_object_count as _);
299 match self.time_travel_pinned_object_count().await {
300 Ok(count) => {
301 self.metrics.time_travel_object_count.set(count as _);
302 }
303 Err(err) => {
304 use thiserror_ext::AsReport;
305 tracing::warn!(error = %err.as_report(), "Failed to count time travel objects.");
306 }
307 }
308 Ok(())
309 }
310
311 pub(crate) async fn complete_gc_batch(
314 &self,
315 object_ids: HashSet<HummockObjectId>,
316 backup_manager: Option<BackupManagerRef>,
317 ) -> Result<usize> {
318 if object_ids.is_empty() {
319 return Ok(0);
320 }
321 let pinned_by_metadata_backup = match backup_manager.as_ref() {
322 Some(b) => b.list_pinned_object_ids().await,
323 None => HashSet::default(),
324 };
325 let min_object_id = collect_min_uncommitted_object_id(
329 &self.metadata_manager,
330 self.env.stream_client_pool(),
331 )
332 .await?;
333 let metrics = &self.metrics;
334 let candidate_object_number = object_ids.len();
335 metrics
336 .full_gc_candidate_object_count
337 .observe(candidate_object_number as _);
338 let object_ids = object_ids
340 .into_iter()
341 .filter(|s| !pinned_by_metadata_backup.contains(&s.as_raw()))
342 .collect_vec();
343 let after_metadata_backup = object_ids.len();
344 let filter_by_time_travel_start_time = Instant::now();
346 let object_ids = self
347 .filter_out_objects_by_time_travel(object_ids.into_iter())
348 .await?;
349 tracing::info!(elapsed = ?filter_by_time_travel_start_time.elapsed(), "filter out objects by time travel in full GC");
350 let after_time_travel = object_ids.len();
351 let object_ids = object_ids
353 .into_iter()
354 .filter(|id| id.as_raw() < min_object_id)
355 .collect_vec();
356 let after_min_object_id = object_ids.len();
357 let after_version = self
359 .finalize_objects_to_delete(object_ids.into_iter())
360 .await?;
361 let after_version_count = after_version.len();
362 metrics
363 .full_gc_selected_object_count
364 .observe(after_version_count as _);
365 tracing::info!(
366 candidate_object_number,
367 after_metadata_backup,
368 after_time_travel,
369 after_min_object_id,
370 after_version_count,
371 "complete gc batch"
372 );
373 self.delete_objects(after_version).await?;
374 Ok(after_version_count)
375 }
376
377 pub async fn now(&self) -> Result<u64> {
378 let mut guard = self.now.lock().await;
379 let new_now = SystemTime::now()
380 .duration_since(SystemTime::UNIX_EPOCH)
381 .expect("Clock may have gone backwards")
382 .as_secs();
383 if new_now < *guard {
384 return Err(anyhow::anyhow!(format!(
385 "unexpected decreasing now, old={}, new={}",
386 *guard, new_now
387 ))
388 .into());
389 }
390 *guard = new_now;
391 drop(guard);
392 let m = hummock_sequence::ActiveModel {
394 name: ActiveValue::Set(HUMMOCK_NOW.into()),
395 seq: ActiveValue::Set(new_now.try_into().unwrap()),
396 };
397 hummock_sequence::Entity::insert(m)
398 .on_conflict(
399 OnConflict::column(hummock_sequence::Column::Name)
400 .update_column(hummock_sequence::Column::Seq)
401 .to_owned(),
402 )
403 .exec(&self.env.meta_store_ref().conn)
404 .await?;
405 Ok(new_now)
406 }
407
408 pub(crate) async fn load_now(&self) -> Result<Option<u64>> {
409 let now = hummock_sequence::Entity::find_by_id(HUMMOCK_NOW.to_owned())
410 .one(&self.env.meta_store_ref().conn)
411 .await?
412 .map(|m| m.seq.try_into().unwrap());
413 Ok(now)
414 }
415
416 async fn write_gc_history(
417 &self,
418 object_ids: impl Iterator<Item = HummockObjectId>,
419 ) -> Result<()> {
420 if self.env.opts.gc_history_retention_time_sec == 0 {
421 return Ok(());
422 }
423 let now = self.now().await?;
424 let dt = DateTime::from_timestamp(now.try_into().unwrap(), 0).unwrap();
425 let mut models = object_ids.map(|o| hummock_gc_history::ActiveModel {
426 object_id: Set(o.as_raw().inner().try_into().unwrap()),
427 mark_delete_at: Set(dt.naive_utc()),
428 });
429 let db = &self.meta_store_ref().conn;
430 let gc_history_low_watermark = DateTime::from_timestamp(
431 now.saturating_sub(self.env.opts.gc_history_retention_time_sec)
432 .try_into()
433 .unwrap(),
434 0,
435 )
436 .unwrap();
437 hummock_gc_history::Entity::delete_many()
438 .filter(hummock_gc_history::Column::MarkDeleteAt.lt(gc_history_low_watermark))
439 .exec(db)
440 .await?;
441 let mut is_finished = false;
442 while !is_finished {
443 let mut batch = vec![];
444 let mut count: usize = self.env.opts.hummock_gc_history_insert_batch_size;
445 while count > 0 {
446 let Some(m) = models.next() else {
447 is_finished = true;
448 break;
449 };
450 count -= 1;
451 batch.push(m);
452 }
453 if batch.is_empty() {
454 break;
455 }
456 hummock_gc_history::Entity::insert_many(batch)
457 .on_conflict_do_nothing()
458 .exec(db)
459 .await?;
460 }
461 Ok(())
462 }
463
464 pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> {
465 let current_epoch_time = Epoch::now().physical_time();
466 let epoch_watermark = Epoch::from_physical_time(
467 current_epoch_time.saturating_sub(
468 self.env
469 .system_params_reader()
470 .await
471 .time_travel_retention_ms(),
472 ),
473 )
474 .0;
475 self.truncate_time_travel_metadata(epoch_watermark).await?;
476 Ok(())
477 }
478
479 pub async fn delete_objects(
483 &self,
484 mut objects_to_delete: Vec<HummockObjectId>,
485 ) -> Result<usize> {
486 let total = objects_to_delete.len();
487 let mut batch_size = 1000usize;
488 while !objects_to_delete.is_empty() {
489 if self.env.opts.vacuum_spin_interval_ms != 0 {
490 tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms))
491 .await;
492 }
493 batch_size = cmp::min(objects_to_delete.len(), batch_size);
494 if batch_size == 0 {
495 break;
496 }
497 let delete_batch: HashSet<_> = objects_to_delete.drain(..batch_size).collect();
498 tracing::info!(?delete_batch, "Attempt to delete objects.");
499 let deleted_object_ids = delete_batch.clone();
500 self.gc_manager
501 .delete_objects(delete_batch.into_iter())
502 .await?;
503 tracing::debug!(?deleted_object_ids, "Finish deleting objects.");
504 }
505 Ok(total)
506 }
507
508 pub async fn try_start_minor_gc(&self, backup_manager: BackupManagerRef) -> Result<()> {
510 const MIN_MINOR_GC_OBJECT_COUNT: usize = 1000;
511 let Some(object_ids) = self
512 .gc_manager
513 .try_take_may_delete_object_ids(MIN_MINOR_GC_OBJECT_COUNT)
514 else {
515 return Ok(());
516 };
517 let backup_pinned: HashSet<_> = backup_manager.list_pinned_object_ids().await;
519 let version_pinned = {
521 let versioning = self.versioning.read().await;
522 versioning
523 .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id())
524 };
525 let object_ids = object_ids
526 .into_iter()
527 .filter(|s| !version_pinned.contains(s) && !backup_pinned.contains(&s.as_raw()));
528 let filter_by_time_travel_start_time = Instant::now();
529 let object_ids = self.filter_out_objects_by_time_travel(object_ids).await?;
530 tracing::info!(elapsed = ?filter_by_time_travel_start_time.elapsed(), "filter out objects by time travel in minor GC");
531 self.delete_objects(object_ids.into_iter().collect())
533 .await?;
534 Ok(())
535 }
536}
537
538async fn collect_min_uncommitted_object_id(
539 metadata_manager: &MetadataManager,
540 client_pool: &StreamClientPool,
541) -> Result<HummockRawObjectId> {
542 let futures = metadata_manager
543 .list_active_streaming_compute_nodes()
544 .await
545 .map_err(|err| Error::MetaStore(err.into()))?
546 .into_iter()
547 .map(|worker_node| async move {
548 let client = client_pool.get(&worker_node).await?;
549 let request = GetMinUncommittedObjectIdRequest {};
550 client.get_min_uncommitted_object_id(request).await
551 });
552 let min_watermark = try_join_all(futures)
553 .await
554 .map_err(|err| Error::Internal(err.into()))?
555 .into_iter()
556 .map(|resp| resp.min_uncommitted_object_id)
557 .min()
558 .unwrap_or(u64::MAX);
559 Ok(min_watermark.into())
560}
561
562pub struct FullGcState {
563 is_started: AtomicBool,
564}
565
566impl FullGcState {
567 pub fn new() -> Self {
568 Self {
569 is_started: AtomicBool::new(false),
570 }
571 }
572
573 pub fn try_start(&self) -> bool {
574 self.is_started
575 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
576 .is_ok()
577 }
578
579 pub fn stop(&self) {
580 self.is_started.store(false, Ordering::SeqCst);
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use std::sync::Arc;
587 use std::time::Duration;
588
589 use itertools::Itertools;
590 use risingwave_hummock_sdk::HummockObjectId;
591 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
592 use risingwave_rpc_client::HummockMetaClient;
593
594 use crate::hummock::MockHummockMetaClient;
595 use crate::hummock::test_utils::{add_test_tables, setup_compute_env};
596
597 #[tokio::test]
598 async fn test_full_gc() {
599 let (_env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
600 let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(MockHummockMetaClient::new(
601 hummock_manager.clone(),
602 worker_id as _,
603 ));
604 let compaction_group_id = StaticCompactionGroupId::StateDefault.into();
605 hummock_manager
606 .start_full_gc(
607 Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1),
608 None,
609 None,
610 )
611 .await
612 .unwrap();
613
614 hummock_manager
616 .complete_gc_batch(vec![].into_iter().collect(), None)
617 .await
618 .unwrap();
619
620 assert_eq!(
623 3,
624 hummock_manager
625 .complete_gc_batch(
626 [i64::MAX as u64 - 2, i64::MAX as u64 - 1, i64::MAX as u64]
627 .into_iter()
628 .map(|id| HummockObjectId::Sstable(id.into()))
629 .collect(),
630 None,
631 )
632 .await
633 .unwrap()
634 );
635
636 let sst_infos = add_test_tables(
638 hummock_manager.as_ref(),
639 hummock_meta_client.clone(),
640 compaction_group_id,
641 )
642 .await;
643 let committed_object_ids = sst_infos
644 .into_iter()
645 .flatten()
646 .map(|s| s.object_id)
647 .sorted()
648 .collect_vec();
649 assert!(!committed_object_ids.is_empty());
650 let max_committed_object_id = *committed_object_ids.iter().max().unwrap();
651 assert_eq!(
652 1,
653 hummock_manager
654 .complete_gc_batch(
655 [committed_object_ids, vec![max_committed_object_id + 1]]
656 .concat()
657 .into_iter()
658 .map(HummockObjectId::Sstable)
659 .collect(),
660 None,
661 )
662 .await
663 .unwrap()
664 );
665 }
666}