risingwave_meta/hummock/manager/
gc.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16use std::collections::HashSet;
17use std::ops::DerefMut;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::time::{Duration, Instant, SystemTime};
20
21use chrono::DateTime;
22use futures::future::try_join_all;
23use futures::{StreamExt, TryStreamExt, future};
24use itertools::Itertools;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::{
28    HummockObjectId, HummockRawObjectId, VALID_OBJECT_ID_SUFFIXES, get_object_data_path,
29    get_object_id_from_path,
30};
31use risingwave_meta_model::hummock_sequence::HUMMOCK_NOW;
32use risingwave_meta_model::{hummock_gc_history, hummock_sequence, hummock_version_delta};
33use risingwave_meta_model_migration::OnConflict;
34use risingwave_object_store::object::{ObjectMetadataIter, ObjectStoreRef};
35use risingwave_pb::stream_service::GetMinUncommittedObjectIdRequest;
36use risingwave_rpc_client::StreamClientPool;
37use sea_orm::{ActiveValue, ColumnTrait, EntityTrait, QueryFilter, Set};
38
39use crate::MetaResult;
40use crate::backup_restore::BackupManagerRef;
41use crate::hummock::HummockManager;
42use crate::hummock::error::{Error, Result};
43use crate::manager::MetadataManager;
44
45pub(crate) struct GcManager {
46    store: ObjectStoreRef,
47    path_prefix: String,
48    use_new_object_prefix_strategy: bool,
49    /// These objects may still be used by backup or time travel.
50    may_delete_object_ids: parking_lot::Mutex<HashSet<HummockObjectId>>,
51}
52
53impl GcManager {
54    pub fn new(
55        store: ObjectStoreRef,
56        path_prefix: &str,
57        use_new_object_prefix_strategy: bool,
58    ) -> Self {
59        Self {
60            store,
61            path_prefix: path_prefix.to_owned(),
62            use_new_object_prefix_strategy,
63            may_delete_object_ids: Default::default(),
64        }
65    }
66
67    /// Deletes all objects specified in the given list of IDs from storage.
68    pub async fn delete_objects(
69        &self,
70        object_id_list: impl Iterator<Item = HummockObjectId>,
71    ) -> Result<()> {
72        let mut paths = Vec::with_capacity(1000);
73        for object_id in object_id_list {
74            let obj_prefix = self.store.get_object_prefix(
75                object_id.as_raw().inner(),
76                self.use_new_object_prefix_strategy,
77            );
78            paths.push(get_object_data_path(
79                &obj_prefix,
80                &self.path_prefix,
81                object_id,
82            ));
83        }
84        self.store.delete_objects(&paths).await?;
85        Ok(())
86    }
87
88    async fn list_object_metadata_from_object_store(
89        &self,
90        prefix: Option<String>,
91        start_after: Option<String>,
92        limit: Option<usize>,
93    ) -> Result<ObjectMetadataIter> {
94        let list_path = format!("{}/{}", self.path_prefix, prefix.unwrap_or("".into()));
95        let raw_iter = self.store.list(&list_path, start_after, limit).await?;
96        let valid_suffixes = VALID_OBJECT_ID_SUFFIXES.map(|suffix| format!(".{}", suffix));
97        let iter = raw_iter.filter(move |r| match r {
98            Ok(i) => future::ready(valid_suffixes.iter().any(|suffix| i.key.ends_with(suffix))),
99            Err(_) => future::ready(true),
100        });
101        Ok(Box::pin(iter))
102    }
103
104    /// Returns **filtered** object ids, and **unfiltered** total object count and size.
105    pub async fn list_objects(
106        &self,
107        object_retention_watermark: u64,
108        prefix: Option<String>,
109        start_after: Option<String>,
110        limit: Option<u64>,
111    ) -> Result<(HashSet<HummockObjectId>, u64, u64, Option<String>)> {
112        tracing::debug!(
113            object_retention_watermark,
114            prefix,
115            start_after,
116            limit,
117            "Try to list objects."
118        );
119        let mut total_object_count = 0;
120        let mut total_object_size = 0;
121        let mut next_start_after: Option<String> = None;
122        let metadata_iter = self
123            .list_object_metadata_from_object_store(prefix, start_after, limit.map(|i| i as usize))
124            .await?;
125        let filtered = metadata_iter
126            .filter_map(|r| {
127                let result = match r {
128                    Ok(o) => {
129                        total_object_count += 1;
130                        total_object_size += o.total_size;
131                        // Determine if the LIST has been truncated.
132                        // A false positives would at most cost one additional LIST later.
133                        if let Some(limit) = limit
134                            && limit == total_object_count
135                        {
136                            next_start_after = Some(o.key.clone());
137                            tracing::debug!(next_start_after, "set next start after");
138                        }
139                        if o.last_modified < object_retention_watermark as f64 {
140                            Some(Ok(get_object_id_from_path(&o.key)))
141                        } else {
142                            None
143                        }
144                    }
145                    Err(e) => Some(Err(Error::ObjectStore(e))),
146                };
147                async move { result }
148            })
149            .try_collect::<HashSet<HummockObjectId>>()
150            .await?;
151        Ok((
152            filtered,
153            total_object_count,
154            total_object_size as u64,
155            next_start_after,
156        ))
157    }
158
159    pub fn add_may_delete_object_ids(
160        &self,
161        may_delete_object_ids: impl Iterator<Item = HummockObjectId>,
162    ) {
163        self.may_delete_object_ids
164            .lock()
165            .extend(may_delete_object_ids);
166    }
167
168    /// Takes if `least_count` elements available.
169    pub fn try_take_may_delete_object_ids(
170        &self,
171        least_count: usize,
172    ) -> Option<HashSet<HummockObjectId>> {
173        let mut guard = self.may_delete_object_ids.lock();
174        if guard.len() < least_count {
175            None
176        } else {
177            Some(std::mem::take(&mut *guard))
178        }
179    }
180}
181
182impl HummockManager {
183    /// Deletes version deltas.
184    ///
185    /// Returns number of deleted deltas
186    pub async fn delete_version_deltas(&self) -> Result<usize> {
187        let mut versioning_guard = self.versioning.write().await;
188        let versioning = versioning_guard.deref_mut();
189        let context_info = self.context_info.read().await;
190        // If there is any safe point, skip this to ensure meta backup has required delta logs to
191        // replay version.
192        if !context_info.version_safe_points.is_empty() {
193            return Ok(0);
194        }
195        // The context_info lock must be held to prevent any potential metadata backup.
196        // The lock order requires version lock to be held as well.
197        let version_id = versioning.checkpoint.version.id;
198        let res = hummock_version_delta::Entity::delete_many()
199            .filter(hummock_version_delta::Column::Id.lte(version_id.to_u64()))
200            .exec(&self.env.meta_store_ref().conn)
201            .await?;
202        tracing::debug!(rows_affected = res.rows_affected, "Deleted version deltas");
203        versioning
204            .hummock_version_deltas
205            .retain(|id, _| *id > version_id);
206        #[cfg(test)]
207        {
208            drop(context_info);
209            drop(versioning_guard);
210            self.check_state_consistency().await;
211        }
212        Ok(res.rows_affected as usize)
213    }
214
215    /// Filters by Hummock version and Writes GC history.
216    pub async fn finalize_objects_to_delete(
217        &self,
218        object_ids: impl Iterator<Item = HummockObjectId>,
219    ) -> Result<Vec<HummockObjectId>> {
220        // This lock ensures `commit_epoch` and `report_compat_task` can see the latest GC history during sanity check.
221        let versioning = self.versioning.read().await;
222        let tracked_object_ids: HashSet<HummockObjectId> = versioning
223            .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id());
224        let to_delete = object_ids
225            .filter(|object_id| !tracked_object_ids.contains(object_id))
226            .collect_vec();
227        self.write_gc_history(to_delete.iter().copied()).await?;
228        Ok(to_delete)
229    }
230
231    /// LIST object store and DELETE stale objects, in batches.
232    /// GC can be very slow. Spawn a dedicated tokio task for it.
233    pub async fn start_full_gc(
234        &self,
235        object_retention_time: Duration,
236        prefix: Option<String>,
237        backup_manager: Option<BackupManagerRef>,
238    ) -> Result<()> {
239        if !self.full_gc_state.try_start() {
240            return Err(anyhow::anyhow!("failed to start GC due to an ongoing process").into());
241        }
242        let _guard = scopeguard::guard(self.full_gc_state.clone(), |full_gc_state| {
243            full_gc_state.stop()
244        });
245        self.metrics.full_gc_trigger_count.inc();
246        let object_retention_time = cmp::max(
247            object_retention_time,
248            Duration::from_secs(self.env.opts.min_sst_retention_time_sec),
249        );
250        let limit = self.env.opts.full_gc_object_limit;
251        let mut start_after = None;
252        let object_retention_watermark = self
253            .now()
254            .await?
255            .saturating_sub(object_retention_time.as_secs());
256        let mut total_object_count = 0;
257        let mut total_object_size = 0;
258        tracing::info!(
259            retention_sec = object_retention_time.as_secs(),
260            prefix,
261            limit,
262            "Start GC."
263        );
264        loop {
265            tracing::debug!(
266                retention_sec = object_retention_time.as_secs(),
267                prefix,
268                start_after,
269                limit,
270                "Start a GC batch."
271            );
272            let (object_ids, batch_object_count, batch_object_size, next_start_after) = self
273                .gc_manager
274                .list_objects(
275                    object_retention_watermark,
276                    prefix.clone(),
277                    start_after.clone(),
278                    Some(limit),
279                )
280                .await?;
281            total_object_count += batch_object_count;
282            total_object_size += batch_object_size;
283            tracing::debug!(
284                ?object_ids,
285                batch_object_count,
286                batch_object_size,
287                "Finish listing a GC batch."
288            );
289            self.complete_gc_batch(object_ids, backup_manager.clone())
290                .await?;
291            if next_start_after.is_none() {
292                break;
293            }
294            start_after = next_start_after;
295        }
296        tracing::info!(total_object_count, total_object_size, "Finish GC");
297        self.metrics.total_object_size.set(total_object_size as _);
298        self.metrics.total_object_count.set(total_object_count as _);
299        match self.time_travel_pinned_object_count().await {
300            Ok(count) => {
301                self.metrics.time_travel_object_count.set(count as _);
302            }
303            Err(err) => {
304                use thiserror_ext::AsReport;
305                tracing::warn!(error = %err.as_report(), "Failed to count time travel objects.");
306            }
307        }
308        Ok(())
309    }
310
311    /// Given candidate objects to delete, filter out false positive.
312    /// Returns number of objects to delete.
313    pub(crate) async fn complete_gc_batch(
314        &self,
315        object_ids: HashSet<HummockObjectId>,
316        backup_manager: Option<BackupManagerRef>,
317    ) -> Result<usize> {
318        if object_ids.is_empty() {
319            return Ok(0);
320        }
321        let pinned_by_metadata_backup = match backup_manager.as_ref() {
322            Some(b) => b.list_pinned_object_ids().await,
323            None => HashSet::default(),
324        };
325        // It's crucial to collect_min_uncommitted_object_id (i.e. `min_object_id`) only after LIST object store (i.e. `object_ids`).
326        // Because after getting `min_object_id`, new compute nodes may join and generate new uncommitted objects that are not covered by `min_sst_id`.
327        // By getting `min_object_id` after `object_ids`, it's ensured `object_ids` won't include any objects from those new compute nodes.
328        let min_object_id = collect_min_uncommitted_object_id(
329            &self.metadata_manager,
330            self.env.stream_client_pool(),
331        )
332        .await?;
333        let metrics = &self.metrics;
334        let candidate_object_number = object_ids.len();
335        metrics
336            .full_gc_candidate_object_count
337            .observe(candidate_object_number as _);
338        // filter by metadata backup
339        let object_ids = object_ids
340            .into_iter()
341            .filter(|s| !pinned_by_metadata_backup.contains(&s.as_raw()))
342            .collect_vec();
343        let after_metadata_backup = object_ids.len();
344        // filter by time travel archive
345        let filter_by_time_travel_start_time = Instant::now();
346        let object_ids = self
347            .filter_out_objects_by_time_travel(object_ids.into_iter())
348            .await?;
349        tracing::info!(elapsed = ?filter_by_time_travel_start_time.elapsed(), "filter out objects by time travel in full GC");
350        let after_time_travel = object_ids.len();
351        // filter by object id watermark, i.e. minimum id of uncommitted objects reported by compute nodes.
352        let object_ids = object_ids
353            .into_iter()
354            .filter(|id| id.as_raw() < min_object_id)
355            .collect_vec();
356        let after_min_object_id = object_ids.len();
357        // filter by version
358        let after_version = self
359            .finalize_objects_to_delete(object_ids.into_iter())
360            .await?;
361        let after_version_count = after_version.len();
362        metrics
363            .full_gc_selected_object_count
364            .observe(after_version_count as _);
365        tracing::info!(
366            candidate_object_number,
367            after_metadata_backup,
368            after_time_travel,
369            after_min_object_id,
370            after_version_count,
371            "complete gc batch"
372        );
373        self.delete_objects(after_version).await?;
374        Ok(after_version_count)
375    }
376
377    pub async fn now(&self) -> Result<u64> {
378        let mut guard = self.now.lock().await;
379        let new_now = SystemTime::now()
380            .duration_since(SystemTime::UNIX_EPOCH)
381            .expect("Clock may have gone backwards")
382            .as_secs();
383        if new_now < *guard {
384            return Err(anyhow::anyhow!(format!(
385                "unexpected decreasing now, old={}, new={}",
386                *guard, new_now
387            ))
388            .into());
389        }
390        *guard = new_now;
391        drop(guard);
392        // Persist now to maintain non-decreasing even after a meta node reboot.
393        let m = hummock_sequence::ActiveModel {
394            name: ActiveValue::Set(HUMMOCK_NOW.into()),
395            seq: ActiveValue::Set(new_now.try_into().unwrap()),
396        };
397        hummock_sequence::Entity::insert(m)
398            .on_conflict(
399                OnConflict::column(hummock_sequence::Column::Name)
400                    .update_column(hummock_sequence::Column::Seq)
401                    .to_owned(),
402            )
403            .exec(&self.env.meta_store_ref().conn)
404            .await?;
405        Ok(new_now)
406    }
407
408    pub(crate) async fn load_now(&self) -> Result<Option<u64>> {
409        let now = hummock_sequence::Entity::find_by_id(HUMMOCK_NOW.to_owned())
410            .one(&self.env.meta_store_ref().conn)
411            .await?
412            .map(|m| m.seq.try_into().unwrap());
413        Ok(now)
414    }
415
416    async fn write_gc_history(
417        &self,
418        object_ids: impl Iterator<Item = HummockObjectId>,
419    ) -> Result<()> {
420        if self.env.opts.gc_history_retention_time_sec == 0 {
421            return Ok(());
422        }
423        let now = self.now().await?;
424        let dt = DateTime::from_timestamp(now.try_into().unwrap(), 0).unwrap();
425        let mut models = object_ids.map(|o| hummock_gc_history::ActiveModel {
426            object_id: Set(o.as_raw().inner().try_into().unwrap()),
427            mark_delete_at: Set(dt.naive_utc()),
428        });
429        let db = &self.meta_store_ref().conn;
430        let gc_history_low_watermark = DateTime::from_timestamp(
431            now.saturating_sub(self.env.opts.gc_history_retention_time_sec)
432                .try_into()
433                .unwrap(),
434            0,
435        )
436        .unwrap();
437        hummock_gc_history::Entity::delete_many()
438            .filter(hummock_gc_history::Column::MarkDeleteAt.lt(gc_history_low_watermark))
439            .exec(db)
440            .await?;
441        let mut is_finished = false;
442        while !is_finished {
443            let mut batch = vec![];
444            let mut count: usize = self.env.opts.hummock_gc_history_insert_batch_size;
445            while count > 0 {
446                let Some(m) = models.next() else {
447                    is_finished = true;
448                    break;
449                };
450                count -= 1;
451                batch.push(m);
452            }
453            if batch.is_empty() {
454                break;
455            }
456            hummock_gc_history::Entity::insert_many(batch)
457                .on_conflict_do_nothing()
458                .exec(db)
459                .await?;
460        }
461        Ok(())
462    }
463
464    pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> {
465        let current_epoch_time = Epoch::now().physical_time();
466        let epoch_watermark = Epoch::from_physical_time(
467            current_epoch_time.saturating_sub(
468                self.env
469                    .system_params_reader()
470                    .await
471                    .time_travel_retention_ms(),
472            ),
473        )
474        .0;
475        self.truncate_time_travel_metadata(epoch_watermark).await?;
476        Ok(())
477    }
478
479    /// Deletes stale objects from object store.
480    ///
481    /// Returns the total count of deleted objects.
482    pub async fn delete_objects(
483        &self,
484        mut objects_to_delete: Vec<HummockObjectId>,
485    ) -> Result<usize> {
486        let total = objects_to_delete.len();
487        let mut batch_size = 1000usize;
488        while !objects_to_delete.is_empty() {
489            if self.env.opts.vacuum_spin_interval_ms != 0 {
490                tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms))
491                    .await;
492            }
493            batch_size = cmp::min(objects_to_delete.len(), batch_size);
494            if batch_size == 0 {
495                break;
496            }
497            let delete_batch: HashSet<_> = objects_to_delete.drain(..batch_size).collect();
498            tracing::info!(?delete_batch, "Attempt to delete objects.");
499            let deleted_object_ids = delete_batch.clone();
500            self.gc_manager
501                .delete_objects(delete_batch.into_iter())
502                .await?;
503            tracing::debug!(?deleted_object_ids, "Finish deleting objects.");
504        }
505        Ok(total)
506    }
507
508    /// Minor GC attempts to delete objects that were part of Hummock version but are no longer in use.
509    pub async fn try_start_minor_gc(&self, backup_manager: BackupManagerRef) -> Result<()> {
510        const MIN_MINOR_GC_OBJECT_COUNT: usize = 1000;
511        let Some(object_ids) = self
512            .gc_manager
513            .try_take_may_delete_object_ids(MIN_MINOR_GC_OBJECT_COUNT)
514        else {
515            return Ok(());
516        };
517        // Objects pinned by either meta backup or time travel should be filtered out.
518        let backup_pinned: HashSet<_> = backup_manager.list_pinned_object_ids().await;
519        // The version_pinned is obtained after the candidate object_ids for deletion, which is new enough for filtering purpose.
520        let version_pinned = {
521            let versioning = self.versioning.read().await;
522            versioning
523                .get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id())
524        };
525        let object_ids = object_ids
526            .into_iter()
527            .filter(|s| !version_pinned.contains(s) && !backup_pinned.contains(&s.as_raw()));
528        let filter_by_time_travel_start_time = Instant::now();
529        let object_ids = self.filter_out_objects_by_time_travel(object_ids).await?;
530        tracing::info!(elapsed = ?filter_by_time_travel_start_time.elapsed(), "filter out objects by time travel in minor GC");
531        // Retry is not necessary. Full GC will handle these objects eventually.
532        self.delete_objects(object_ids.into_iter().collect())
533            .await?;
534        Ok(())
535    }
536}
537
538async fn collect_min_uncommitted_object_id(
539    metadata_manager: &MetadataManager,
540    client_pool: &StreamClientPool,
541) -> Result<HummockRawObjectId> {
542    let futures = metadata_manager
543        .list_active_streaming_compute_nodes()
544        .await
545        .map_err(|err| Error::MetaStore(err.into()))?
546        .into_iter()
547        .map(|worker_node| async move {
548            let client = client_pool.get(&worker_node).await?;
549            let request = GetMinUncommittedObjectIdRequest {};
550            client.get_min_uncommitted_object_id(request).await
551        });
552    let min_watermark = try_join_all(futures)
553        .await
554        .map_err(|err| Error::Internal(err.into()))?
555        .into_iter()
556        .map(|resp| resp.min_uncommitted_object_id)
557        .min()
558        .unwrap_or(u64::MAX);
559    Ok(min_watermark.into())
560}
561
562pub struct FullGcState {
563    is_started: AtomicBool,
564}
565
566impl FullGcState {
567    pub fn new() -> Self {
568        Self {
569            is_started: AtomicBool::new(false),
570        }
571    }
572
573    pub fn try_start(&self) -> bool {
574        self.is_started
575            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
576            .is_ok()
577    }
578
579    pub fn stop(&self) {
580        self.is_started.store(false, Ordering::SeqCst);
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use std::sync::Arc;
587    use std::time::Duration;
588
589    use itertools::Itertools;
590    use risingwave_hummock_sdk::HummockObjectId;
591    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
592    use risingwave_rpc_client::HummockMetaClient;
593
594    use crate::hummock::MockHummockMetaClient;
595    use crate::hummock::test_utils::{add_test_tables, setup_compute_env};
596
597    #[tokio::test]
598    async fn test_full_gc() {
599        let (_env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
600        let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(MockHummockMetaClient::new(
601            hummock_manager.clone(),
602            worker_id as _,
603        ));
604        let compaction_group_id = StaticCompactionGroupId::StateDefault.into();
605        hummock_manager
606            .start_full_gc(
607                Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1),
608                None,
609                None,
610            )
611            .await
612            .unwrap();
613
614        // Empty input results immediate return, without waiting heartbeat.
615        hummock_manager
616            .complete_gc_batch(vec![].into_iter().collect(), None)
617            .await
618            .unwrap();
619
620        // LSMtree is empty. All input object ids should be treated as garbage.
621        // Use fake object ids, because they'll be written to GC history and they shouldn't affect later commit.
622        assert_eq!(
623            3,
624            hummock_manager
625                .complete_gc_batch(
626                    [i64::MAX as u64 - 2, i64::MAX as u64 - 1, i64::MAX as u64]
627                        .into_iter()
628                        .map(|id| HummockObjectId::Sstable(id.into()))
629                        .collect(),
630                    None,
631                )
632                .await
633                .unwrap()
634        );
635
636        // All committed SST ids should be excluded from GC.
637        let sst_infos = add_test_tables(
638            hummock_manager.as_ref(),
639            hummock_meta_client.clone(),
640            compaction_group_id,
641        )
642        .await;
643        let committed_object_ids = sst_infos
644            .into_iter()
645            .flatten()
646            .map(|s| s.object_id)
647            .sorted()
648            .collect_vec();
649        assert!(!committed_object_ids.is_empty());
650        let max_committed_object_id = *committed_object_ids.iter().max().unwrap();
651        assert_eq!(
652            1,
653            hummock_manager
654                .complete_gc_batch(
655                    [committed_object_ids, vec![max_committed_object_id + 1]]
656                        .concat()
657                        .into_iter()
658                        .map(HummockObjectId::Sstable)
659                        .collect(),
660                    None,
661                )
662                .await
663                .unwrap()
664        );
665    }
666}