risingwave_storage/hummock/
validator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::BorrowMut;
16use std::cmp;
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use risingwave_hummock_sdk::compact_task::ValidationTask;
21use risingwave_hummock_sdk::key::FullKey;
22
23use crate::hummock::iterator::HummockIterator;
24use crate::hummock::sstable::SstableIteratorReadOptions;
25use crate::hummock::sstable_store::SstableStoreRef;
26use crate::hummock::{CachePolicy, SstableIterator};
27use crate::monitor::StoreLocalStatistic;
28
29/// Validate SSTs in terms of Ordered, Locally unique and Globally unique.
30///
31/// See `src/storage/src/hummock/state_store.rs`
32pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) {
33    let mut visited_keys = HashMap::new();
34    let mut unused = StoreLocalStatistic::default();
35    for sstable_info in task.sst_infos {
36        let mut key_counts = 0;
37        let worker_id = *task
38            .sst_id_to_worker_id
39            .get(&sstable_info.object_id)
40            .expect("valid worker_id");
41        tracing::debug!(
42            "Validating SST sst_id {} object_id {} from worker {}",
43            sstable_info.sst_id,
44            sstable_info.object_id,
45            worker_id,
46        );
47        let holder = match sstable_store
48            .sstable(&sstable_info, unused.borrow_mut())
49            .await
50        {
51            Ok(holder) => holder,
52            Err(_err) => {
53                // One reasonable cause is the SST has been vacuumed.
54                tracing::info!(
55                    "Skip sanity check for SST sst_id {} object_id {} .",
56                    sstable_info.sst_id,
57                    sstable_info.object_id,
58                );
59                continue;
60            }
61        };
62
63        // TODO: to use `prefetch: false` after `prefetch` be supported by
64        // SstableIteratorReadOptions
65        let mut iter = SstableIterator::new(
66            holder,
67            sstable_store.clone(),
68            Arc::new(SstableIteratorReadOptions {
69                cache_policy: CachePolicy::NotFill,
70                must_iterated_end_user_key: None,
71                max_preload_retry_times: 0,
72                prefetch_for_large_query: false,
73            }),
74            &sstable_info,
75        );
76        let mut previous_key: Option<FullKey<Vec<u8>>> = None;
77        if let Err(_err) = iter.rewind().await {
78            tracing::info!(
79                "Skip sanity check for SST sst_id {} object_id {}.",
80                sstable_info.sst_id,
81                sstable_info.object_id
82            );
83        }
84        while iter.is_valid() {
85            key_counts += 1;
86            let current_key = iter.key().to_vec();
87            // Locally unique and Globally unique
88            if let Some((duplicate_sst_object_id, duplicate_worker_id)) =
89                visited_keys.get(&current_key).cloned()
90            {
91                panic!(
92                    "SST sanity check failed: Duplicate key {:x?} in SST object {} from worker {} and SST object {} from worker {}",
93                    current_key,
94                    sstable_info.object_id,
95                    worker_id,
96                    duplicate_sst_object_id,
97                    duplicate_worker_id
98                )
99            }
100            visited_keys.insert(current_key.to_owned(), (sstable_info.object_id, worker_id));
101            // Ordered and Locally unique
102            if let Some(previous_key) = previous_key.take() {
103                let cmp = previous_key.cmp(&current_key);
104                if cmp != cmp::Ordering::Less {
105                    panic!(
106                        "SST sanity check failed: For SST sst_id {} object_id {}, expect {:x?} < {:x?}, got {:#?}",
107                        sstable_info.sst_id, sstable_info.object_id, previous_key, current_key, cmp
108                    )
109                }
110            }
111            previous_key = Some(current_key);
112            if let Err(_err) = iter.next().await {
113                tracing::info!(
114                    "Skip remaining sanity check for SST {}",
115                    sstable_info.object_id
116                );
117                break;
118            }
119        }
120        tracing::debug!(
121            "Validated {} keys for SST sst_id {} object_id {}",
122            key_counts,
123            sstable_info.sst_id,
124            sstable_info.object_id
125        );
126        iter.collect_local_statistic(&mut unused);
127        unused.ignore();
128    }
129}