risingwave_meta/hummock/manager/
gc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16use std::collections::HashSet;
17use std::ops::DerefMut;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::time::{Duration, Instant, SystemTime};
20
21use chrono::DateTime;
22use futures::future::try_join_all;
23use futures::{StreamExt, TryStreamExt, future};
24use itertools::Itertools;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::{
28    HummockObjectId, HummockRawObjectId, VALID_OBJECT_ID_SUFFIXES, get_object_data_path,
29    get_object_id_from_path,
30};
31use risingwave_meta_model::hummock_sequence::HUMMOCK_NOW;
32use risingwave_meta_model::{hummock_gc_history, hummock_sequence, hummock_version_delta};
33use risingwave_meta_model_migration::OnConflict;
34use risingwave_object_store::object::{ObjectMetadataIter, ObjectStoreRef};
35use risingwave_pb::stream_service::GetMinUncommittedObjectIdRequest;
36use risingwave_rpc_client::StreamClientPool;
37use sea_orm::{ActiveValue, ColumnTrait, EntityTrait, QueryFilter, Set};
38
39use crate::MetaResult;
40use crate::backup_restore::BackupManagerRef;
41use crate::hummock::HummockManager;
42use crate::hummock::error::{Error, Result};
43use crate::manager::MetadataManager;
44
45pub(crate) struct GcManager {
46    store: ObjectStoreRef,
47    path_prefix: String,
48    use_new_object_prefix_strategy: bool,
49    /// These objects may still be used by backup or time travel.
50    may_delete_object_ids: parking_lot::Mutex<HashSet<HummockObjectId>>,
51}
52
53impl GcManager {
54    pub fn new(
55        store: ObjectStoreRef,
56        path_prefix: &str,
57        use_new_object_prefix_strategy: bool,
58    ) -> Self {
59        Self {
60            store,
61            path_prefix: path_prefix.to_owned(),
62            use_new_object_prefix_strategy,
63            may_delete_object_ids: Default::default(),
64        }
65    }
66
67    /// Deletes all objects specified in the given list of IDs from storage.
68    pub async fn delete_objects(
69        &self,
70        object_id_list: impl Iterator<Item = HummockObjectId>,
71    ) -> Result<()> {
72        let mut paths = Vec::with_capacity(1000);
73        for object_id in object_id_list {
74            let obj_prefix = self.store.get_object_prefix(
75                object_id.as_raw().inner(),
76                self.use_new_object_prefix_strategy,
77            );
78            paths.push(get_object_data_path(
79                &obj_prefix,
80                &self.path_prefix,
81                object_id,
82            ));
83        }
84        self.store.delete_objects(&paths).await?;
85        Ok(())
86    }
87
88    async fn list_object_metadata_from_object_store(
89        &self,
90        prefix: Option<String>,
91        start_after: Option<String>,
92        limit: Option<usize>,
93    ) -> Result<ObjectMetadataIter> {
94        let list_path = format!("{}/{}", self.path_prefix, prefix.unwrap_or("".into()));
95        let raw_iter = self.store.list(&list_path, start_after, limit).await?;
96        let valid_suffixes = VALID_OBJECT_ID_SUFFIXES.map(|suffix| format!(".{}", suffix));
97        let iter = raw_iter.filter(move |r| match r {
98            Ok(i) => future::ready(valid_suffixes.iter().any(|suffix| i.key.ends_with(suffix))),
99            Err(_) => future::ready(true),
100        });
101        Ok(Box::pin(iter))
102    }
103
104    /// Returns **filtered** object ids, and **unfiltered** total object count and size.
105    pub async fn list_objects(
106        &self,
107        object_retention_watermark: u64,
108        prefix: Option<String>,
109        start_after: Option<String>,
110        limit: Option<u64>,
111    ) -> Result<(HashSet<HummockObjectId>, u64, u64, Option<String>)> {
112        tracing::debug!(
113            object_retention_watermark,
114            prefix,
115            start_after,
116            limit,
117            "Try to list objects."
118        );
119        let mut total_object_count = 0;
120        let mut total_object_size = 0;
121        let mut next_start_after: Option<String> = None;
122        let metadata_iter = self
123            .list_object_metadata_from_object_store(prefix, start_after, limit.map(|i| i as usize))
124            .await?;
125        let filtered = metadata_iter
126            .filter_map(|r| {
127                let result = match r {
128                    Ok(o) => {
129                        total_object_count += 1;
130                        total_object_size += o.total_size;
131                        // Determine if the LIST has been truncated.
132                        // A false positives would at most cost one additional LIST later.
133                        if let Some(limit) = limit
134                            && limit == total_object_count
135                        {
136                            next_start_after = Some(o.key.clone());
137                            tracing::debug!(next_start_after, "set next start after");
138                        }
139                        if o.last_modified < object_retention_watermark as f64 {
140                            Some(Ok(get_object_id_from_path(&o.key)))
141                        } else {
142                            None
143                        }
144                    }
145                    Err(e) => Some(Err(Error::ObjectStore(e))),
146                };
147                async move { result }
148            })
149            .try_collect::<HashSet<HummockObjectId>>()
150            .await?;
151        Ok((
152            filtered,
153            total_object_count,
154            total_object_size as u64,
155            next_start_after,
156        ))
157    }
158
159    pub fn add_may_delete_object_ids(
160        &self,
161        may_delete_object_ids: impl Iterator<Item = HummockObjectId>,
162    ) {
163        self.may_delete_object_ids
164            .lock()
165            .extend(may_delete_object_ids);
166    }
167
168    /// Takes if `least_count` elements available.
169    pub fn try_take_may_delete_object_ids(
170        &self,
171        least_count: usize,
172    ) -> Option<HashSet<HummockObjectId>> {
173        let mut guard = self.may_delete_object_ids.lock();
174        if guard.len() < least_count {
175            None
176        } else {
177            Some(std::mem::take(&mut *guard))
178        }
179    }
180}
181
182impl HummockManager {
183    /// Deletes version deltas.
184    ///
185    /// Returns number of deleted deltas
186    pub async fn delete_version_deltas(&self) -> Result<usize> {
187        let mut versioning_guard = self.versioning.write().await;
188        let versioning = versioning_guard.deref_mut();
189        let context_info = self.context_info.read().await;
190        // If there is any safe point, skip this to ensure meta backup has required delta logs to
191        // replay version.
192        if !context_info.version_safe_points.is_empty() {
193            return Ok(0);
194        }
195        // The context_info lock must be held to prevent any potential metadata backup.
196        // The lock order requires version lock to be held as well.
197        let version_id = versioning.checkpoint.version.id;
198        let res = hummock_version_delta::Entity::delete_many()
199            .filter(hummock_version_delta::Column::Id.lte(version_id.to_u64()))
200            .exec(&self.env.meta_store_ref().conn)
201            .await?;
202        tracing::debug!(rows_affected = res.rows_affected, "Deleted version deltas");
203        versioning
204            .hummock_version_deltas
205            .retain(|id, _| *id > version_id);
206        #[cfg(test)]
207        {
208            drop(context_info);
209            drop(versioning_guard);
210            self.check_state_consistency().await;
211        }
212        Ok(res.rows_affected as usize)
213    }
214
215    /// Filters by Hummock version and Writes GC history.
216    pub async fn finalize_objects_to_delete(
217        &self,
218        object_ids: impl Iterator<Item = HummockObjectId>,
219    ) -> Result<Vec<HummockObjectId>> {
220        // This lock ensures `commit_epoch` and `report_compat_task` can see the latest GC history during sanity check.
221        let versioning = self.versioning.read().await;
222        let tracked_object_ids: HashSet<HummockObjectId> = versioning
223            .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id());
224        let to_delete = object_ids
225            .filter(|object_id| !tracked_object_ids.contains(object_id))
226            .collect_vec();
227        self.write_gc_history(to_delete.iter().copied()).await?;
228        Ok(to_delete)
229    }
230
231    /// LIST object store and DELETE stale objects, in batches.
232    /// GC can be very slow. Spawn a dedicated tokio task for it.
233    pub async fn start_full_gc(
234        &self,
235        object_retention_time: Duration,
236        prefix: Option<String>,
237        backup_manager: Option<BackupManagerRef>,
238    ) -> Result<()> {
239        if !self.full_gc_state.try_start() {
240            return Err(anyhow::anyhow!("failed to start GC due to an ongoing process").into());
241        }
242        let _guard = scopeguard::guard(self.full_gc_state.clone(), |full_gc_state| {
243            full_gc_state.stop()
244        });
245        self.metrics.full_gc_trigger_count.inc();
246        let object_retention_time = cmp::max(
247            object_retention_time,
248            Duration::from_secs(self.env.opts.min_sst_retention_time_sec),
249        );
250        let limit = self.env.opts.full_gc_object_limit;
251        let mut start_after = None;
252        let object_retention_watermark = self
253            .now()
254            .await?
255            .saturating_sub(object_retention_time.as_secs());
256        let mut total_object_count = 0;
257        let mut total_object_size = 0;
258        tracing::info!(
259            retention_sec = object_retention_time.as_secs(),
260            prefix,
261            limit,
262            "Start GC."
263        );
264        loop {
265            tracing::debug!(
266                retention_sec = object_retention_time.as_secs(),
267                prefix,
268                start_after,
269                limit,
270                "Start a GC batch."
271            );
272            let (object_ids, batch_object_count, batch_object_size, next_start_after) = self
273                .gc_manager
274                .list_objects(
275                    object_retention_watermark,
276                    prefix.clone(),
277                    start_after.clone(),
278                    Some(limit),
279                )
280                .await?;
281            total_object_count += batch_object_count;
282            total_object_size += batch_object_size;
283            tracing::debug!(
284                ?object_ids,
285                batch_object_count,
286                batch_object_size,
287                "Finish listing a GC batch."
288            );
289            self.complete_gc_batch(object_ids, backup_manager.clone())
290                .await?;
291            if next_start_after.is_none() {
292                break;
293            }
294            start_after = next_start_after;
295        }
296        tracing::info!(total_object_count, total_object_size, "Finish GC");
297        self.metrics.total_object_size.set(total_object_size as _);
298        self.metrics.total_object_count.set(total_object_count as _);
299        match self.time_travel_pinned_object_count().await {
300            Ok(count) => {
301                self.metrics.time_travel_object_count.set(count as _);
302            }
303            Err(err) => {
304                use thiserror_ext::AsReport;
305                tracing::warn!(error = %err.as_report(), "Failed to count time travel objects.");
306            }
307        }
308        Ok(())
309    }
310
311    /// Given candidate objects to delete, filter out false positive.
312    /// Returns number of objects to delete.
313    pub(crate) async fn complete_gc_batch(
314        &self,
315        object_ids: HashSet<HummockObjectId>,
316        backup_manager: Option<BackupManagerRef>,
317    ) -> Result<usize> {
318        if object_ids.is_empty() {
319            return Ok(0);
320        }
321        // It's crucial to get pinned_by_metadata_backup only after object_ids.
322        let pinned_by_metadata_backup = backup_manager
323            .as_ref()
324            .map(|b| b.list_pinned_object_ids())
325            .unwrap_or_default();
326        // It's crucial to collect_min_uncommitted_object_id (i.e. `min_object_id`) only after LIST object store (i.e. `object_ids`).
327        // Because after getting `min_object_id`, new compute nodes may join and generate new uncommitted objects that are not covered by `min_sst_id`.
328        // By getting `min_object_id` after `object_ids`, it's ensured `object_ids` won't include any objects from those new compute nodes.
329        let min_object_id = collect_min_uncommitted_object_id(
330            &self.metadata_manager,
331            self.env.stream_client_pool(),
332        )
333        .await?;
334        let metrics = &self.metrics;
335        let candidate_object_number = object_ids.len();
336        metrics
337            .full_gc_candidate_object_count
338            .observe(candidate_object_number as _);
339        // filter by metadata backup
340        let object_ids = object_ids
341            .into_iter()
342            .filter(|s| !pinned_by_metadata_backup.contains(s))
343            .collect_vec();
344        let after_metadata_backup = object_ids.len();
345        // filter by time travel archive
346        let filter_by_time_travel_start_time = Instant::now();
347        let object_ids = self
348            .filter_out_objects_by_time_travel(object_ids.into_iter())
349            .await?;
350        tracing::info!(elapsed = ?filter_by_time_travel_start_time.elapsed(), "filter out objects by time travel in full GC");
351        let after_time_travel = object_ids.len();
352        // filter by object id watermark, i.e. minimum id of uncommitted objects reported by compute nodes.
353        let object_ids = object_ids
354            .into_iter()
355            .filter(|id| id.as_raw() < min_object_id)
356            .collect_vec();
357        let after_min_object_id = object_ids.len();
358        // filter by version
359        let after_version = self
360            .finalize_objects_to_delete(object_ids.into_iter())
361            .await?;
362        let after_version_count = after_version.len();
363        metrics
364            .full_gc_selected_object_count
365            .observe(after_version_count as _);
366        tracing::info!(
367            candidate_object_number,
368            after_metadata_backup,
369            after_time_travel,
370            after_min_object_id,
371            after_version_count,
372            "complete gc batch"
373        );
374        self.delete_objects(after_version).await?;
375        Ok(after_version_count)
376    }
377
378    pub async fn now(&self) -> Result<u64> {
379        let mut guard = self.now.lock().await;
380        let new_now = SystemTime::now()
381            .duration_since(SystemTime::UNIX_EPOCH)
382            .expect("Clock may have gone backwards")
383            .as_secs();
384        if new_now < *guard {
385            return Err(anyhow::anyhow!(format!(
386                "unexpected decreasing now, old={}, new={}",
387                *guard, new_now
388            ))
389            .into());
390        }
391        *guard = new_now;
392        drop(guard);
393        // Persist now to maintain non-decreasing even after a meta node reboot.
394        let m = hummock_sequence::ActiveModel {
395            name: ActiveValue::Set(HUMMOCK_NOW.into()),
396            seq: ActiveValue::Set(new_now.try_into().unwrap()),
397        };
398        hummock_sequence::Entity::insert(m)
399            .on_conflict(
400                OnConflict::column(hummock_sequence::Column::Name)
401                    .update_column(hummock_sequence::Column::Seq)
402                    .to_owned(),
403            )
404            .exec(&self.env.meta_store_ref().conn)
405            .await?;
406        Ok(new_now)
407    }
408
409    pub(crate) async fn load_now(&self) -> Result<Option<u64>> {
410        let now = hummock_sequence::Entity::find_by_id(HUMMOCK_NOW.to_owned())
411            .one(&self.env.meta_store_ref().conn)
412            .await?
413            .map(|m| m.seq.try_into().unwrap());
414        Ok(now)
415    }
416
417    async fn write_gc_history(
418        &self,
419        object_ids: impl Iterator<Item = HummockObjectId>,
420    ) -> Result<()> {
421        if self.env.opts.gc_history_retention_time_sec == 0 {
422            return Ok(());
423        }
424        let now = self.now().await?;
425        let dt = DateTime::from_timestamp(now.try_into().unwrap(), 0).unwrap();
426        let mut models = object_ids.map(|o| hummock_gc_history::ActiveModel {
427            object_id: Set(o.as_raw().inner().try_into().unwrap()),
428            mark_delete_at: Set(dt.naive_utc()),
429        });
430        let db = &self.meta_store_ref().conn;
431        let gc_history_low_watermark = DateTime::from_timestamp(
432            now.saturating_sub(self.env.opts.gc_history_retention_time_sec)
433                .try_into()
434                .unwrap(),
435            0,
436        )
437        .unwrap();
438        hummock_gc_history::Entity::delete_many()
439            .filter(hummock_gc_history::Column::MarkDeleteAt.lt(gc_history_low_watermark))
440            .exec(db)
441            .await?;
442        let mut is_finished = false;
443        while !is_finished {
444            let mut batch = vec![];
445            let mut count: usize = self.env.opts.hummock_gc_history_insert_batch_size;
446            while count > 0 {
447                let Some(m) = models.next() else {
448                    is_finished = true;
449                    break;
450                };
451                count -= 1;
452                batch.push(m);
453            }
454            if batch.is_empty() {
455                break;
456            }
457            hummock_gc_history::Entity::insert_many(batch)
458                .on_conflict_do_nothing()
459                .exec(db)
460                .await?;
461        }
462        Ok(())
463    }
464
465    pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> {
466        let current_epoch_time = Epoch::now().physical_time();
467        let epoch_watermark = Epoch::from_physical_time(
468            current_epoch_time.saturating_sub(
469                self.env
470                    .system_params_reader()
471                    .await
472                    .time_travel_retention_ms(),
473            ),
474        )
475        .0;
476        self.truncate_time_travel_metadata(epoch_watermark).await?;
477        Ok(())
478    }
479
480    /// Deletes stale objects from object store.
481    ///
482    /// Returns the total count of deleted objects.
483    pub async fn delete_objects(
484        &self,
485        mut objects_to_delete: Vec<HummockObjectId>,
486    ) -> Result<usize> {
487        let total = objects_to_delete.len();
488        let mut batch_size = 1000usize;
489        while !objects_to_delete.is_empty() {
490            if self.env.opts.vacuum_spin_interval_ms != 0 {
491                tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms))
492                    .await;
493            }
494            batch_size = cmp::min(objects_to_delete.len(), batch_size);
495            if batch_size == 0 {
496                break;
497            }
498            let delete_batch: HashSet<_> = objects_to_delete.drain(..batch_size).collect();
499            tracing::info!(?delete_batch, "Attempt to delete objects.");
500            let deleted_object_ids = delete_batch.clone();
501            self.gc_manager
502                .delete_objects(delete_batch.into_iter())
503                .await?;
504            tracing::debug!(?deleted_object_ids, "Finish deleting objects.");
505        }
506        Ok(total)
507    }
508
509    /// Minor GC attempts to delete objects that were part of Hummock version but are no longer in use.
510    pub async fn try_start_minor_gc(&self, backup_manager: BackupManagerRef) -> Result<()> {
511        const MIN_MINOR_GC_OBJECT_COUNT: usize = 1000;
512        let Some(object_ids) = self
513            .gc_manager
514            .try_take_may_delete_object_ids(MIN_MINOR_GC_OBJECT_COUNT)
515        else {
516            return Ok(());
517        };
518        // Objects pinned by either meta backup or time travel should be filtered out.
519        let backup_pinned: HashSet<_> = backup_manager.list_pinned_object_ids();
520        // The version_pinned is obtained after the candidate object_ids for deletion, which is new enough for filtering purpose.
521        let version_pinned = {
522            let versioning = self.versioning.read().await;
523            versioning
524                .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id())
525        };
526        let object_ids = object_ids
527            .into_iter()
528            .filter(|s| !version_pinned.contains(s) && !backup_pinned.contains(s));
529        let filter_by_time_travel_start_time = Instant::now();
530        let object_ids = self.filter_out_objects_by_time_travel(object_ids).await?;
531        tracing::info!(elapsed = ?filter_by_time_travel_start_time.elapsed(), "filter out objects by time travel in minor GC");
532        // Retry is not necessary. Full GC will handle these objects eventually.
533        self.delete_objects(object_ids.into_iter().collect())
534            .await?;
535        Ok(())
536    }
537}
538
539async fn collect_min_uncommitted_object_id(
540    metadata_manager: &MetadataManager,
541    client_pool: &StreamClientPool,
542) -> Result<HummockRawObjectId> {
543    let futures = metadata_manager
544        .list_active_streaming_compute_nodes()
545        .await
546        .map_err(|err| Error::MetaStore(err.into()))?
547        .into_iter()
548        .map(|worker_node| async move {
549            let client = client_pool.get(&worker_node).await?;
550            let request = GetMinUncommittedObjectIdRequest {};
551            client.get_min_uncommitted_object_id(request).await
552        });
553    let min_watermark = try_join_all(futures)
554        .await
555        .map_err(|err| Error::Internal(err.into()))?
556        .into_iter()
557        .map(|resp| resp.min_uncommitted_object_id)
558        .min()
559        .unwrap_or(u64::MAX);
560    Ok(min_watermark.into())
561}
562
563pub struct FullGcState {
564    is_started: AtomicBool,
565}
566
567impl FullGcState {
568    pub fn new() -> Self {
569        Self {
570            is_started: AtomicBool::new(false),
571        }
572    }
573
574    pub fn try_start(&self) -> bool {
575        self.is_started
576            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
577            .is_ok()
578    }
579
580    pub fn stop(&self) {
581        self.is_started.store(false, Ordering::SeqCst);
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use std::sync::Arc;
588    use std::time::Duration;
589
590    use itertools::Itertools;
591    use risingwave_hummock_sdk::HummockObjectId;
592    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
593    use risingwave_rpc_client::HummockMetaClient;
594
595    use crate::hummock::MockHummockMetaClient;
596    use crate::hummock::test_utils::{add_test_tables, setup_compute_env};
597
598    #[tokio::test]
599    async fn test_full_gc() {
600        let (_env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
601        let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(MockHummockMetaClient::new(
602            hummock_manager.clone(),
603            worker_id as _,
604        ));
605        let compaction_group_id = StaticCompactionGroupId::StateDefault.into();
606        hummock_manager
607            .start_full_gc(
608                Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1),
609                None,
610                None,
611            )
612            .await
613            .unwrap();
614
615        // Empty input results immediate return, without waiting heartbeat.
616        hummock_manager
617            .complete_gc_batch(vec![].into_iter().collect(), None)
618            .await
619            .unwrap();
620
621        // LSMtree is empty. All input object ids should be treated as garbage.
622        // Use fake object ids, because they'll be written to GC history and they shouldn't affect later commit.
623        assert_eq!(
624            3,
625            hummock_manager
626                .complete_gc_batch(
627                    [i64::MAX as u64 - 2, i64::MAX as u64 - 1, i64::MAX as u64]
628                        .into_iter()
629                        .map(|id| HummockObjectId::Sstable(id.into()))
630                        .collect(),
631                    None,
632                )
633                .await
634                .unwrap()
635        );
636
637        // All committed SST ids should be excluded from GC.
638        let sst_infos = add_test_tables(
639            hummock_manager.as_ref(),
640            hummock_meta_client.clone(),
641            compaction_group_id,
642        )
643        .await;
644        let committed_object_ids = sst_infos
645            .into_iter()
646            .flatten()
647            .map(|s| s.object_id)
648            .sorted()
649            .collect_vec();
650        assert!(!committed_object_ids.is_empty());
651        let max_committed_object_id = *committed_object_ids.iter().max().unwrap();
652        assert_eq!(
653            1,
654            hummock_manager
655                .complete_gc_batch(
656                    [committed_object_ids, vec![max_committed_object_id + 1]]
657                        .concat()
658                        .into_iter()
659                        .map(HummockObjectId::Sstable)
660                        .collect(),
661                    None,
662                )
663                .await
664                .unwrap()
665        );
666    }
667}