risingwave_meta/hummock/manager/
context.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16
17use fail::fail_point;
18use futures::{StreamExt, stream};
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_hummock_sdk::{
23    HummockContextId, HummockSstableObjectId, HummockVersionId, INVALID_VERSION_ID,
24    LocalSstableInfo,
25};
26use risingwave_meta_model::hummock_gc_history;
27use risingwave_pb::hummock::{HummockPinnedVersion, ValidationTask};
28use sea_orm::{DatabaseConnection, EntityTrait};
29
30use crate::controller::SqlMetaStore;
31use crate::hummock::HummockManager;
32use crate::hummock::error::{Error, Result};
33use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender};
34use crate::hummock::manager::{commit_multi_var, start_measure_real_process_timer};
35use crate::hummock::metrics_utils::trigger_pin_unpin_version_state;
36use crate::manager::{META_NODE_ID, MetadataManager};
37use crate::model::BTreeMapTransaction;
38use crate::rpc::metrics::MetaMetrics;
39
40/// `HummockVersionSafePoint` prevents hummock versions GE than it from being GC.
41/// It's used by meta node itself to temporarily pin versions.
42pub struct HummockVersionSafePoint {
43    pub id: HummockVersionId,
44    event_sender: HummockManagerEventSender,
45}
46
47impl Drop for HummockVersionSafePoint {
48    fn drop(&mut self) {
49        if self
50            .event_sender
51            .send(HummockManagerEvent::DropSafePoint(self.id))
52            .is_err()
53        {
54            tracing::debug!("failed to drop hummock version safe point {}", self.id);
55        }
56    }
57}
58
59#[derive(Default)]
60pub(super) struct ContextInfo {
61    pub pinned_versions: BTreeMap<HummockContextId, HummockPinnedVersion>,
62    /// `version_safe_points` is similar to `pinned_versions` expect for being a transient state.
63    pub version_safe_points: Vec<HummockVersionId>,
64}
65
66impl ContextInfo {
67    /// Release resources pinned by these contexts, including:
68    /// - Version
69    /// - Snapshot
70    async fn release_contexts(
71        &mut self,
72        context_ids: impl AsRef<[HummockContextId]>,
73        meta_store_ref: SqlMetaStore,
74    ) -> Result<()> {
75        fail_point!("release_contexts_metastore_err", |_| Err(Error::MetaStore(
76            anyhow::anyhow!("failpoint metastore error")
77        )));
78        fail_point!("release_contexts_internal_err", |_| Err(Error::Internal(
79            anyhow::anyhow!("failpoint internal error")
80        )));
81
82        let mut pinned_versions = BTreeMapTransaction::new(&mut self.pinned_versions);
83        for context_id in context_ids.as_ref() {
84            pinned_versions.remove(*context_id);
85        }
86        commit_multi_var!(meta_store_ref, pinned_versions)?;
87
88        Ok(())
89    }
90}
91
92impl HummockManager {
93    pub async fn release_contexts(
94        &self,
95        context_ids: impl AsRef<[HummockContextId]>,
96    ) -> Result<()> {
97        let mut context_info = self.context_info.write().await;
98        context_info
99            .release_contexts(context_ids, self.env.meta_store())
100            .await?;
101        #[cfg(test)]
102        {
103            drop(context_info);
104            self.check_state_consistency().await;
105        }
106        Ok(())
107    }
108
109    /// Checks whether `context_id` is valid.
110    pub async fn check_context(&self, context_id: HummockContextId) -> Result<bool> {
111        self.context_info
112            .read()
113            .await
114            .check_context(context_id, &self.metadata_manager)
115            .await
116    }
117
118    async fn check_context_with_meta_node(
119        &self,
120        context_id: HummockContextId,
121        context_info: &ContextInfo,
122    ) -> Result<()> {
123        if context_id == META_NODE_ID {
124            // Using the preserved meta id is allowed.
125        } else if !context_info
126            .check_context(context_id, &self.metadata_manager)
127            .await?
128        {
129            // The worker is not found in cluster.
130            return Err(Error::InvalidContext(context_id));
131        }
132        Ok(())
133    }
134
135    #[cfg(any(test, feature = "test"))]
136    pub async fn get_min_pinned_version_id(&self) -> HummockVersionId {
137        self.context_info.read().await.min_pinned_version_id()
138    }
139}
140
141impl ContextInfo {
142    /// Checks whether `context_id` is valid.
143    ///
144    /// Need `&self` to sync with `release_context`
145    pub(super) async fn check_context(
146        &self,
147        context_id: HummockContextId,
148        metadata_manager: &MetadataManager,
149    ) -> Result<bool> {
150        Ok(metadata_manager
151            .get_worker_by_id(context_id as _)
152            .await
153            .map_err(|err| Error::MetaStore(err.into()))?
154            .is_some())
155    }
156}
157
158impl HummockManager {
159    /// Release invalid contexts, aka worker node ids which are no longer valid in `ClusterManager`.
160    pub(super) async fn release_invalid_contexts(&self) -> Result<Vec<HummockContextId>> {
161        let (active_context_ids, mut context_info) = {
162            let compaction_guard = self.compaction.read().await;
163            let context_info = self.context_info.write().await;
164            let _timer = start_measure_real_process_timer!(self, "release_invalid_contexts");
165            let mut active_context_ids = HashSet::new();
166            active_context_ids.extend(
167                compaction_guard
168                    .compact_task_assignment
169                    .values()
170                    .map(|c| c.context_id),
171            );
172            active_context_ids.extend(context_info.pinned_versions.keys());
173            (active_context_ids, context_info)
174        };
175
176        let mut invalid_context_ids = vec![];
177        for active_context_id in &active_context_ids {
178            if !context_info
179                .check_context(*active_context_id, &self.metadata_manager)
180                .await?
181            {
182                invalid_context_ids.push(*active_context_id);
183            }
184        }
185
186        context_info
187            .release_contexts(&invalid_context_ids, self.env.meta_store())
188            .await?;
189
190        Ok(invalid_context_ids)
191    }
192
193    pub async fn commit_epoch_sanity_check(
194        &self,
195        tables_to_commit: &HashMap<TableId, u64>,
196        sstables: &[LocalSstableInfo],
197        sst_to_context: &HashMap<HummockSstableObjectId, HummockContextId>,
198        current_version: &HummockVersion,
199    ) -> Result<()> {
200        use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
201
202        for (sst_id, context_id) in sst_to_context {
203            #[cfg(test)]
204            {
205                if *context_id == crate::manager::META_NODE_ID {
206                    continue;
207                }
208            }
209            if !self
210                .context_info
211                .read()
212                .await
213                .check_context(*context_id, &self.metadata_manager)
214                .await?
215            {
216                return Err(Error::InvalidSst(*sst_id));
217            }
218        }
219
220        // sanity check on monotonically increasing table committed epoch
221        for (table_id, committed_epoch) in tables_to_commit {
222            if let Some(info) = current_version.state_table_info.info().get(table_id) {
223                if *committed_epoch <= info.committed_epoch {
224                    return Err(anyhow::anyhow!(
225                        "table {} Epoch {} <= committed_epoch {}",
226                        table_id,
227                        committed_epoch,
228                        info.committed_epoch,
229                    )
230                    .into());
231                }
232            }
233        }
234
235        // HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible.
236        if !sstables.is_empty() {
237            // Sanity check to ensure SSTs to commit have not been full GCed yet.
238            let now = self.now().await?;
239            check_sst_retention(
240                now,
241                self.env.opts.min_sst_retention_time_sec,
242                sstables
243                    .iter()
244                    .map(|s| (s.sst_info.object_id, s.created_at)),
245            )?;
246            if self.env.opts.gc_history_retention_time_sec != 0 {
247                let ids = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
248                check_gc_history(&self.meta_store_ref().conn, ids).await?;
249            }
250        }
251
252        async {
253            if !self.env.opts.enable_committed_sst_sanity_check {
254                return;
255            }
256            if sstables.is_empty() {
257                return;
258            }
259            let compactor = match self.compactor_manager.next_compactor() {
260                None => {
261                    tracing::warn!("Skip committed SST sanity check due to no available worker");
262                    return;
263                }
264                Some(compactor) => compactor,
265            };
266            let sst_infos = sstables
267                .iter()
268                .map(|LocalSstableInfo { sst_info, .. }| sst_info.clone())
269                .collect_vec();
270            if compactor
271                .send_event(ResponseEvent::ValidationTask(ValidationTask {
272                    sst_infos: sst_infos.into_iter().map(|sst| sst.into()).collect_vec(),
273                    sst_id_to_worker_id: sst_to_context
274                        .iter()
275                        .map(|(object_id, worker_id)| (object_id.inner(), *worker_id))
276                        .collect(),
277                }))
278                .is_err()
279            {
280                tracing::warn!("Skip committed SST sanity check due to send failure");
281            }
282        }
283        .await;
284        Ok(())
285    }
286
287    pub async fn release_meta_context(&self) -> Result<()> {
288        self.release_contexts([META_NODE_ID]).await
289    }
290
291    pub(crate) async fn report_compaction_sanity_check(
292        &self,
293        object_timestamps: &HashMap<HummockSstableObjectId, u64>,
294    ) -> Result<()> {
295        // HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible.
296        if object_timestamps.is_empty() {
297            return Ok(());
298        }
299        let now = self.now().await?;
300        check_sst_retention(
301            now,
302            self.env.opts.min_sst_retention_time_sec,
303            object_timestamps.iter().map(|(k, v)| (*k, *v)),
304        )?;
305        if self.env.opts.gc_history_retention_time_sec != 0 {
306            let ids = object_timestamps.iter().map(|(id, _)| *id).collect_vec();
307            check_gc_history(&self.meta_store_ref().conn, ids).await?;
308        }
309        Ok(())
310    }
311}
312
313fn check_sst_retention(
314    now: u64,
315    retention_sec: u64,
316    sst_infos: impl Iterator<Item = (HummockSstableObjectId, u64)>,
317) -> Result<()> {
318    let sst_retention_watermark = now.saturating_sub(retention_sec);
319    for (object_id, created_at) in sst_infos {
320        if created_at < sst_retention_watermark {
321            return Err(anyhow::anyhow!("object {object_id} is rejected from being committed since it's below watermark: object timestamp {created_at}, meta node timestamp {now}, retention_sec {retention_sec}, watermark {sst_retention_watermark}").into());
322        }
323    }
324    Ok(())
325}
326
327async fn check_gc_history(
328    db: &DatabaseConnection,
329    // need IntoIterator to work around stream's "implementation of `std::iter::Iterator` is not general enough" error.
330    object_ids: impl IntoIterator<Item = HummockSstableObjectId>,
331) -> Result<()> {
332    let futures = object_ids.into_iter().map(|id| async move {
333        let id: risingwave_meta_model::HummockSstableObjectId = id.inner().try_into().unwrap();
334        hummock_gc_history::Entity::find_by_id(id)
335            .one(db)
336            .await
337            .map_err(Error::from)
338    });
339    let res: Vec<_> = stream::iter(futures).buffer_unordered(10).collect().await;
340    let res: Result<Vec<_>> = res.into_iter().collect();
341    let mut expired_object_ids = res?.into_iter().flatten().peekable();
342    if expired_object_ids.peek().is_none() {
343        return Ok(());
344    }
345    let expired_object_ids: Vec<_> = expired_object_ids.collect();
346    tracing::error!(
347        ?expired_object_ids,
348        "new SSTs are rejected because they have already been GCed"
349    );
350    Err(Error::InvalidSst(
351        (expired_object_ids[0].object_id as u64).into(),
352    ))
353}
354
355// pin and unpin method
356impl HummockManager {
357    /// Pin the current greatest hummock version. The pin belongs to `context_id`
358    /// and will be unpinned when `context_id` is invalidated.
359    pub async fn pin_version(&self, context_id: HummockContextId) -> Result<HummockVersion> {
360        let versioning = self.versioning.read().await;
361        let mut context_info = self.context_info.write().await;
362        self.check_context_with_meta_node(context_id, &context_info)
363            .await?;
364        let _timer = start_measure_real_process_timer!(self, "pin_version");
365        let mut pinned_versions = BTreeMapTransaction::new(&mut context_info.pinned_versions);
366        let mut context_pinned_version = pinned_versions.new_entry_txn_or_default(
367            context_id,
368            HummockPinnedVersion {
369                context_id,
370                min_pinned_id: INVALID_VERSION_ID.to_u64(),
371            },
372        );
373        let version_id = versioning.current_version.id;
374        let ret = versioning.current_version.clone();
375        if HummockVersionId::new(context_pinned_version.min_pinned_id) == INVALID_VERSION_ID
376            || HummockVersionId::new(context_pinned_version.min_pinned_id) > version_id
377        {
378            context_pinned_version.min_pinned_id = version_id.to_u64();
379            commit_multi_var!(self.meta_store_ref(), context_pinned_version)?;
380            trigger_pin_unpin_version_state(&self.metrics, &context_info.pinned_versions);
381        }
382
383        #[cfg(test)]
384        {
385            drop(context_info);
386            drop(versioning);
387            self.check_state_consistency().await;
388        }
389
390        Ok(ret)
391    }
392
393    /// Unpin all pins which belongs to `context_id` and has an id which is older than
394    /// `unpin_before`. All versions >= `unpin_before` will be treated as if they are all pinned by
395    /// this `context_id` so they will not be vacuumed.
396    pub async fn unpin_version_before(
397        &self,
398        context_id: HummockContextId,
399        unpin_before: HummockVersionId,
400    ) -> Result<()> {
401        let mut context_info = self.context_info.write().await;
402        self.check_context_with_meta_node(context_id, &context_info)
403            .await?;
404        let _timer = start_measure_real_process_timer!(self, "unpin_version_before");
405        let mut pinned_versions = BTreeMapTransaction::new(&mut context_info.pinned_versions);
406        let mut context_pinned_version = pinned_versions.new_entry_txn_or_default(
407            context_id,
408            HummockPinnedVersion {
409                context_id,
410                min_pinned_id: 0,
411            },
412        );
413        assert!(
414            context_pinned_version.min_pinned_id <= unpin_before.to_u64(),
415            "val must be monotonically non-decreasing. old = {}, new = {}.",
416            context_pinned_version.min_pinned_id,
417            unpin_before
418        );
419        context_pinned_version.min_pinned_id = unpin_before.to_u64();
420        commit_multi_var!(self.meta_store_ref(), context_pinned_version)?;
421        trigger_pin_unpin_version_state(&self.metrics, &context_info.pinned_versions);
422
423        #[cfg(test)]
424        {
425            drop(context_info);
426            self.check_state_consistency().await;
427        }
428
429        Ok(())
430    }
431}
432
433// safe point
434impl HummockManager {
435    pub async fn register_safe_point(&self) -> HummockVersionSafePoint {
436        let versioning = self.versioning.read().await;
437        let mut wl = self.context_info.write().await;
438        let safe_point = HummockVersionSafePoint {
439            id: versioning.current_version.id,
440            event_sender: self.event_sender.clone(),
441        };
442        wl.version_safe_points.push(safe_point.id);
443        trigger_safepoint_stat(&self.metrics, &wl.version_safe_points);
444        safe_point
445    }
446
447    pub async fn unregister_safe_point(&self, safe_point: HummockVersionId) {
448        let mut wl = self.context_info.write().await;
449        let version_safe_points = &mut wl.version_safe_points;
450        if let Some(pos) = version_safe_points.iter().position(|sp| *sp == safe_point) {
451            version_safe_points.remove(pos);
452        }
453        trigger_safepoint_stat(&self.metrics, &wl.version_safe_points);
454    }
455}
456
457fn trigger_safepoint_stat(metrics: &MetaMetrics, safepoints: &[HummockVersionId]) {
458    if let Some(sp) = safepoints.iter().min() {
459        metrics.min_safepoint_version_id.set(sp.to_u64() as _);
460    } else {
461        metrics
462            .min_safepoint_version_id
463            .set(HummockVersionId::MAX.to_u64() as _);
464    }
465}