risingwave_meta/hummock/manager/
gc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16use std::collections::HashSet;
17use std::ops::DerefMut;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::time::{Duration, SystemTime};
20
21use chrono::DateTime;
22use futures::future::try_join_all;
23use futures::{StreamExt, TryStreamExt, future};
24use itertools::Itertools;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::{
28    HummockSstableObjectId, OBJECT_SUFFIX, get_object_id_from_path, get_sst_data_path,
29};
30use risingwave_meta_model::hummock_sequence::HUMMOCK_NOW;
31use risingwave_meta_model::{hummock_gc_history, hummock_sequence, hummock_version_delta};
32use risingwave_meta_model_migration::OnConflict;
33use risingwave_object_store::object::{ObjectMetadataIter, ObjectStoreRef};
34use risingwave_pb::stream_service::GetMinUncommittedSstIdRequest;
35use risingwave_rpc_client::StreamClientPool;
36use sea_orm::{ActiveValue, ColumnTrait, EntityTrait, QueryFilter, Set};
37
38use crate::MetaResult;
39use crate::backup_restore::BackupManagerRef;
40use crate::hummock::HummockManager;
41use crate::hummock::error::{Error, Result};
42use crate::manager::MetadataManager;
43
44pub(crate) struct GcManager {
45    store: ObjectStoreRef,
46    path_prefix: String,
47    use_new_object_prefix_strategy: bool,
48    /// These objects may still be used by backup or time travel.
49    may_delete_object_ids: parking_lot::Mutex<HashSet<HummockSstableObjectId>>,
50}
51
52impl GcManager {
53    pub fn new(
54        store: ObjectStoreRef,
55        path_prefix: &str,
56        use_new_object_prefix_strategy: bool,
57    ) -> Self {
58        Self {
59            store,
60            path_prefix: path_prefix.to_owned(),
61            use_new_object_prefix_strategy,
62            may_delete_object_ids: Default::default(),
63        }
64    }
65
66    /// Deletes all SSTs specified in the given list of IDs from storage.
67    pub async fn delete_objects(
68        &self,
69        object_id_list: impl Iterator<Item = HummockSstableObjectId>,
70    ) -> Result<()> {
71        let mut paths = Vec::with_capacity(1000);
72        for object_id in object_id_list {
73            let obj_prefix = self
74                .store
75                .get_object_prefix(object_id, self.use_new_object_prefix_strategy);
76            paths.push(get_sst_data_path(&obj_prefix, &self.path_prefix, object_id));
77        }
78        self.store.delete_objects(&paths).await?;
79        Ok(())
80    }
81
82    async fn list_object_metadata_from_object_store(
83        &self,
84        prefix: Option<String>,
85        start_after: Option<String>,
86        limit: Option<usize>,
87    ) -> Result<ObjectMetadataIter> {
88        let list_path = format!("{}/{}", self.path_prefix, prefix.unwrap_or("".into()));
89        let raw_iter = self.store.list(&list_path, start_after, limit).await?;
90        let iter = raw_iter.filter(|r| match r {
91            Ok(i) => future::ready(i.key.ends_with(&format!(".{}", OBJECT_SUFFIX))),
92            Err(_) => future::ready(true),
93        });
94        Ok(Box::pin(iter))
95    }
96
97    /// Returns **filtered** object ids, and **unfiltered** total object count and size.
98    pub async fn list_objects(
99        &self,
100        sst_retention_watermark: u64,
101        prefix: Option<String>,
102        start_after: Option<String>,
103        limit: Option<u64>,
104    ) -> Result<(HashSet<HummockSstableObjectId>, u64, u64, Option<String>)> {
105        tracing::debug!(
106            sst_retention_watermark,
107            prefix,
108            start_after,
109            limit,
110            "Try to list objects."
111        );
112        let mut total_object_count = 0;
113        let mut total_object_size = 0;
114        let mut next_start_after: Option<String> = None;
115        let metadata_iter = self
116            .list_object_metadata_from_object_store(prefix, start_after, limit.map(|i| i as usize))
117            .await?;
118        let filtered = metadata_iter
119            .filter_map(|r| {
120                let result = match r {
121                    Ok(o) => {
122                        total_object_count += 1;
123                        total_object_size += o.total_size;
124                        // Determine if the LIST has been truncated.
125                        // A false positives would at most cost one additional LIST later.
126                        if let Some(limit) = limit
127                            && limit == total_object_count
128                        {
129                            next_start_after = Some(o.key.clone());
130                            tracing::debug!(next_start_after, "set next start after");
131                        }
132                        if o.last_modified < sst_retention_watermark as f64 {
133                            Some(Ok(get_object_id_from_path(&o.key)))
134                        } else {
135                            None
136                        }
137                    }
138                    Err(e) => Some(Err(Error::ObjectStore(e))),
139                };
140                async move { result }
141            })
142            .try_collect::<HashSet<HummockSstableObjectId>>()
143            .await?;
144        Ok((
145            filtered,
146            total_object_count,
147            total_object_size as u64,
148            next_start_after,
149        ))
150    }
151
152    pub fn add_may_delete_object_ids(
153        &self,
154        may_delete_object_ids: impl Iterator<Item = HummockSstableObjectId>,
155    ) {
156        self.may_delete_object_ids
157            .lock()
158            .extend(may_delete_object_ids);
159    }
160
161    /// Takes if `least_count` elements available.
162    pub fn try_take_may_delete_object_ids(
163        &self,
164        least_count: usize,
165    ) -> Option<HashSet<HummockSstableObjectId>> {
166        let mut guard = self.may_delete_object_ids.lock();
167        if guard.len() < least_count {
168            None
169        } else {
170            Some(std::mem::take(&mut *guard))
171        }
172    }
173}
174
175impl HummockManager {
176    /// Deletes version deltas.
177    ///
178    /// Returns number of deleted deltas
179    pub async fn delete_version_deltas(&self) -> Result<usize> {
180        let mut versioning_guard = self.versioning.write().await;
181        let versioning = versioning_guard.deref_mut();
182        let context_info = self.context_info.read().await;
183        // If there is any safe point, skip this to ensure meta backup has required delta logs to
184        // replay version.
185        if !context_info.version_safe_points.is_empty() {
186            return Ok(0);
187        }
188        // The context_info lock must be held to prevent any potential metadata backup.
189        // The lock order requires version lock to be held as well.
190        let version_id = versioning.checkpoint.version.id;
191        let res = hummock_version_delta::Entity::delete_many()
192            .filter(hummock_version_delta::Column::Id.lte(version_id.to_u64()))
193            .exec(&self.env.meta_store_ref().conn)
194            .await?;
195        tracing::debug!(rows_affected = res.rows_affected, "Deleted version deltas");
196        versioning
197            .hummock_version_deltas
198            .retain(|id, _| *id > version_id);
199        #[cfg(test)]
200        {
201            drop(context_info);
202            drop(versioning_guard);
203            self.check_state_consistency().await;
204        }
205        Ok(res.rows_affected as usize)
206    }
207
208    /// Filters by Hummock version and Writes GC history.
209    pub async fn finalize_objects_to_delete(
210        &self,
211        object_ids: impl Iterator<Item = HummockSstableObjectId> + Clone,
212    ) -> Result<Vec<HummockSstableObjectId>> {
213        // This lock ensures `commit_epoch` and `report_compat_task` can see the latest GC history during sanity check.
214        let versioning = self.versioning.read().await;
215        let tracked_object_ids: HashSet<HummockSstableObjectId> = versioning
216            .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id());
217        let to_delete = object_ids.filter(|object_id| !tracked_object_ids.contains(object_id));
218        self.write_gc_history(to_delete.clone()).await?;
219        Ok(to_delete.collect())
220    }
221
222    /// LIST object store and DELETE stale objects, in batches.
223    /// GC can be very slow. Spawn a dedicated tokio task for it.
224    pub async fn start_full_gc(
225        &self,
226        sst_retention_time: Duration,
227        prefix: Option<String>,
228        backup_manager: Option<BackupManagerRef>,
229    ) -> Result<()> {
230        if !self.full_gc_state.try_start() {
231            return Err(anyhow::anyhow!("failed to start GC due to an ongoing process").into());
232        }
233        let _guard = scopeguard::guard(self.full_gc_state.clone(), |full_gc_state| {
234            full_gc_state.stop()
235        });
236        self.metrics.full_gc_trigger_count.inc();
237        let sst_retention_time = cmp::max(
238            sst_retention_time,
239            Duration::from_secs(self.env.opts.min_sst_retention_time_sec),
240        );
241        let limit = self.env.opts.full_gc_object_limit;
242        let mut start_after = None;
243        let sst_retention_watermark = self
244            .now()
245            .await?
246            .saturating_sub(sst_retention_time.as_secs());
247        let mut total_object_count = 0;
248        let mut total_object_size = 0;
249        tracing::info!(
250            retention_sec = sst_retention_time.as_secs(),
251            prefix,
252            limit,
253            "Start GC."
254        );
255        loop {
256            tracing::debug!(
257                retention_sec = sst_retention_time.as_secs(),
258                prefix,
259                start_after,
260                limit,
261                "Start a GC batch."
262            );
263            let (object_ids, batch_object_count, batch_object_size, next_start_after) = self
264                .gc_manager
265                .list_objects(
266                    sst_retention_watermark,
267                    prefix.clone(),
268                    start_after.clone(),
269                    Some(limit),
270                )
271                .await?;
272            total_object_count += batch_object_count;
273            total_object_size += batch_object_size;
274            tracing::debug!(
275                ?object_ids,
276                batch_object_count,
277                batch_object_size,
278                "Finish listing a GC batch."
279            );
280            self.complete_gc_batch(object_ids, backup_manager.clone())
281                .await?;
282            if next_start_after.is_none() {
283                break;
284            }
285            start_after = next_start_after;
286        }
287        tracing::info!(total_object_count, total_object_size, "Finish GC");
288        self.metrics.total_object_size.set(total_object_size as _);
289        self.metrics.total_object_count.set(total_object_count as _);
290        match self.time_travel_pinned_object_count().await {
291            Ok(count) => {
292                self.metrics.time_travel_object_count.set(count as _);
293            }
294            Err(err) => {
295                use thiserror_ext::AsReport;
296                tracing::warn!(error = %err.as_report(), "Failed to count time travel objects.");
297            }
298        }
299        Ok(())
300    }
301
302    /// Given candidate SSTs to delete, filter out false positive.
303    /// Returns number of SSTs to delete.
304    pub(crate) async fn complete_gc_batch(
305        &self,
306        object_ids: HashSet<HummockSstableObjectId>,
307        backup_manager: Option<BackupManagerRef>,
308    ) -> Result<usize> {
309        if object_ids.is_empty() {
310            return Ok(0);
311        }
312        // It's crucial to get pinned_by_metadata_backup only after object_ids.
313        let pinned_by_metadata_backup = backup_manager
314            .as_ref()
315            .map(|b| b.list_pinned_ssts())
316            .unwrap_or_default();
317        // It's crucial to collect_min_uncommitted_sst_id (i.e. `min_sst_id`) only after LIST object store (i.e. `object_ids`).
318        // Because after getting `min_sst_id`, new compute nodes may join and generate new uncommitted SSTs that are not covered by `min_sst_id`.
319        // By getting `min_sst_id` after `object_ids`, it's ensured `object_ids` won't include any SSTs from those new compute nodes.
320        let min_sst_id =
321            collect_min_uncommitted_sst_id(&self.metadata_manager, self.env.stream_client_pool())
322                .await?;
323        let metrics = &self.metrics;
324        let candidate_object_number = object_ids.len();
325        metrics
326            .full_gc_candidate_object_count
327            .observe(candidate_object_number as _);
328        // filter by metadata backup
329        let object_ids = object_ids
330            .into_iter()
331            .filter(|s| !pinned_by_metadata_backup.contains(s))
332            .collect_vec();
333        let after_metadata_backup = object_ids.len();
334        // filter by time travel archive
335        let object_ids = self
336            .filter_out_objects_by_time_travel(
337                object_ids.into_iter(),
338                self.env
339                    .opts
340                    .hummock_time_travel_filter_out_objects_batch_size,
341            )
342            .await?;
343        let after_time_travel = object_ids.len();
344        // filter by SST id watermark, i.e. minimum id of uncommitted SSTs reported by compute nodes.
345        let object_ids = object_ids
346            .into_iter()
347            .filter(|id| *id < min_sst_id)
348            .collect_vec();
349        let after_min_sst_id = object_ids.len();
350        // filter by version
351        let after_version = self
352            .finalize_objects_to_delete(object_ids.into_iter())
353            .await?;
354        let after_version_count = after_version.len();
355        metrics
356            .full_gc_selected_object_count
357            .observe(after_version_count as _);
358        tracing::info!(
359            candidate_object_number,
360            after_metadata_backup,
361            after_time_travel,
362            after_min_sst_id,
363            after_version_count,
364            "complete gc batch"
365        );
366        self.delete_objects(after_version).await?;
367        Ok(after_version_count)
368    }
369
370    pub async fn now(&self) -> Result<u64> {
371        let mut guard = self.now.lock().await;
372        let new_now = SystemTime::now()
373            .duration_since(SystemTime::UNIX_EPOCH)
374            .expect("Clock may have gone backwards")
375            .as_secs();
376        if new_now < *guard {
377            return Err(anyhow::anyhow!(format!(
378                "unexpected decreasing now, old={}, new={}",
379                *guard, new_now
380            ))
381            .into());
382        }
383        *guard = new_now;
384        drop(guard);
385        // Persist now to maintain non-decreasing even after a meta node reboot.
386        let m = hummock_sequence::ActiveModel {
387            name: ActiveValue::Set(HUMMOCK_NOW.into()),
388            seq: ActiveValue::Set(new_now.try_into().unwrap()),
389        };
390        hummock_sequence::Entity::insert(m)
391            .on_conflict(
392                OnConflict::column(hummock_sequence::Column::Name)
393                    .update_column(hummock_sequence::Column::Seq)
394                    .to_owned(),
395            )
396            .exec(&self.env.meta_store_ref().conn)
397            .await?;
398        Ok(new_now)
399    }
400
401    pub(crate) async fn load_now(&self) -> Result<Option<u64>> {
402        let now = hummock_sequence::Entity::find_by_id(HUMMOCK_NOW.to_owned())
403            .one(&self.env.meta_store_ref().conn)
404            .await?
405            .map(|m| m.seq.try_into().unwrap());
406        Ok(now)
407    }
408
409    async fn write_gc_history(
410        &self,
411        object_ids: impl Iterator<Item = HummockSstableObjectId>,
412    ) -> Result<()> {
413        if self.env.opts.gc_history_retention_time_sec == 0 {
414            return Ok(());
415        }
416        let now = self.now().await?;
417        let dt = DateTime::from_timestamp(now.try_into().unwrap(), 0).unwrap();
418        let mut models = object_ids.map(|o| hummock_gc_history::ActiveModel {
419            object_id: Set(o.try_into().unwrap()),
420            mark_delete_at: Set(dt.naive_utc()),
421        });
422        let db = &self.meta_store_ref().conn;
423        let gc_history_low_watermark = DateTime::from_timestamp(
424            now.saturating_sub(self.env.opts.gc_history_retention_time_sec)
425                .try_into()
426                .unwrap(),
427            0,
428        )
429        .unwrap();
430        hummock_gc_history::Entity::delete_many()
431            .filter(hummock_gc_history::Column::MarkDeleteAt.lt(gc_history_low_watermark))
432            .exec(db)
433            .await?;
434        let mut is_finished = false;
435        while !is_finished {
436            let mut batch = vec![];
437            let mut count: usize = self.env.opts.hummock_gc_history_insert_batch_size;
438            while count > 0 {
439                let Some(m) = models.next() else {
440                    is_finished = true;
441                    break;
442                };
443                count -= 1;
444                batch.push(m);
445            }
446            if batch.is_empty() {
447                break;
448            }
449            hummock_gc_history::Entity::insert_many(batch)
450                .on_conflict_do_nothing()
451                .exec(db)
452                .await?;
453        }
454        Ok(())
455    }
456
457    pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> {
458        let current_epoch_time = Epoch::now().physical_time();
459        let epoch_watermark = Epoch::from_physical_time(
460            current_epoch_time.saturating_sub(
461                self.env
462                    .system_params_reader()
463                    .await
464                    .time_travel_retention_ms(),
465            ),
466        )
467        .0;
468        self.truncate_time_travel_metadata(epoch_watermark).await?;
469        Ok(())
470    }
471
472    /// Deletes stale SST objects from object store.
473    ///
474    /// Returns the total count of deleted SST objects.
475    pub async fn delete_objects(
476        &self,
477        mut objects_to_delete: Vec<HummockSstableObjectId>,
478    ) -> Result<usize> {
479        let total = objects_to_delete.len();
480        let mut batch_size = 1000usize;
481        while !objects_to_delete.is_empty() {
482            if self.env.opts.vacuum_spin_interval_ms != 0 {
483                tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms))
484                    .await;
485            }
486            batch_size = cmp::min(objects_to_delete.len(), batch_size);
487            if batch_size == 0 {
488                break;
489            }
490            let delete_batch: HashSet<_> = objects_to_delete.drain(..batch_size).collect();
491            tracing::info!(?delete_batch, "Attempt to delete objects.");
492            let deleted_object_ids = delete_batch.clone();
493            self.gc_manager
494                .delete_objects(delete_batch.into_iter())
495                .await?;
496            tracing::debug!(?deleted_object_ids, "Finish deleting objects.");
497        }
498        Ok(total)
499    }
500
501    /// Minor GC attempts to delete objects that were part of Hummock version but are no longer in use.
502    pub async fn try_start_minor_gc(&self, backup_manager: BackupManagerRef) -> Result<()> {
503        const MIN_MINOR_GC_OBJECT_COUNT: usize = 1000;
504        let Some(object_ids) = self
505            .gc_manager
506            .try_take_may_delete_object_ids(MIN_MINOR_GC_OBJECT_COUNT)
507        else {
508            return Ok(());
509        };
510        // Objects pinned by either meta backup or time travel should be filtered out.
511        let backup_pinned: HashSet<_> = backup_manager.list_pinned_ssts();
512        // The version_pinned is obtained after the candidate object_ids for deletion, which is new enough for filtering purpose.
513        let version_pinned = {
514            let versioning = self.versioning.read().await;
515            versioning
516                .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id())
517        };
518        let object_ids = object_ids
519            .into_iter()
520            .filter(|s| !version_pinned.contains(s) && !backup_pinned.contains(s));
521        let object_ids = self
522            .filter_out_objects_by_time_travel(
523                object_ids,
524                self.env
525                    .opts
526                    .hummock_time_travel_filter_out_objects_batch_size,
527            )
528            .await?;
529        // Retry is not necessary. Full GC will handle these objects eventually.
530        self.delete_objects(object_ids.into_iter().collect())
531            .await?;
532        Ok(())
533    }
534}
535
536async fn collect_min_uncommitted_sst_id(
537    metadata_manager: &MetadataManager,
538    client_pool: &StreamClientPool,
539) -> Result<HummockSstableObjectId> {
540    let futures = metadata_manager
541        .list_active_streaming_compute_nodes()
542        .await
543        .map_err(|err| Error::MetaStore(err.into()))?
544        .into_iter()
545        .map(|worker_node| async move {
546            let client = client_pool.get(&worker_node).await?;
547            let request = GetMinUncommittedSstIdRequest {};
548            client.get_min_uncommitted_sst_id(request).await
549        });
550    let min_watermark = try_join_all(futures)
551        .await
552        .map_err(|err| Error::Internal(err.into()))?
553        .into_iter()
554        .map(|resp| resp.min_uncommitted_sst_id)
555        .min()
556        .unwrap_or(HummockSstableObjectId::MAX);
557    Ok(min_watermark)
558}
559
560pub struct FullGcState {
561    is_started: AtomicBool,
562}
563
564impl FullGcState {
565    pub fn new() -> Self {
566        Self {
567            is_started: AtomicBool::new(false),
568        }
569    }
570
571    pub fn try_start(&self) -> bool {
572        self.is_started
573            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
574            .is_ok()
575    }
576
577    pub fn stop(&self) {
578        self.is_started.store(false, Ordering::SeqCst);
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use std::sync::Arc;
585    use std::time::Duration;
586
587    use itertools::Itertools;
588    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
589    use risingwave_rpc_client::HummockMetaClient;
590
591    use crate::hummock::MockHummockMetaClient;
592    use crate::hummock::test_utils::{add_test_tables, setup_compute_env};
593
594    #[tokio::test]
595    async fn test_full_gc() {
596        let (_env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
597        let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(MockHummockMetaClient::new(
598            hummock_manager.clone(),
599            worker_id as _,
600        ));
601        let compaction_group_id = StaticCompactionGroupId::StateDefault.into();
602        hummock_manager
603            .start_full_gc(
604                Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1),
605                None,
606                None,
607            )
608            .await
609            .unwrap();
610
611        // Empty input results immediate return, without waiting heartbeat.
612        hummock_manager
613            .complete_gc_batch(vec![].into_iter().collect(), None)
614            .await
615            .unwrap();
616
617        // LSMtree is empty. All input SST ids should be treated as garbage.
618        // Use fake object ids, because they'll be written to GC history and they shouldn't affect later commit.
619        assert_eq!(
620            3,
621            hummock_manager
622                .complete_gc_batch(
623                    vec![i64::MAX as u64 - 2, i64::MAX as u64 - 1, i64::MAX as u64]
624                        .into_iter()
625                        .collect(),
626                    None,
627                )
628                .await
629                .unwrap()
630        );
631
632        // All committed SST ids should be excluded from GC.
633        let sst_infos = add_test_tables(
634            hummock_manager.as_ref(),
635            hummock_meta_client.clone(),
636            compaction_group_id,
637        )
638        .await;
639        let committed_object_ids = sst_infos
640            .into_iter()
641            .flatten()
642            .map(|s| s.object_id)
643            .sorted()
644            .collect_vec();
645        assert!(!committed_object_ids.is_empty());
646        let max_committed_object_id = *committed_object_ids.iter().max().unwrap();
647        assert_eq!(
648            1,
649            hummock_manager
650                .complete_gc_batch(
651                    [committed_object_ids, vec![max_committed_object_id + 1]]
652                        .concat()
653                        .into_iter()
654                        .collect(),
655                    None,
656                )
657                .await
658                .unwrap()
659        );
660    }
661}