risingwave_hummock_test/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use bytes::Bytes;
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common_service::ObserverManager;
23use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
24use risingwave_hummock_sdk::key::TableKey;
25pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str};
26use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
27use risingwave_meta::controller::cluster::ClusterControllerRef;
28use risingwave_meta::hummock::test_utils::{
29    register_table_ids_to_compaction_group, setup_compute_env,
30};
31use risingwave_meta::hummock::{
32    CommitEpochInfo, HummockManagerRef, MockHummockMetaClient, NewTableFragmentInfo,
33};
34use risingwave_meta::manager::MetaSrvEnv;
35use risingwave_pb::catalog::{PbTable, Table};
36use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
37use risingwave_rpc_client::HummockMetaClient;
38use risingwave_storage::compaction_catalog_manager::{
39    CompactionCatalogManager, CompactionCatalogManagerRef,
40};
41use risingwave_storage::error::StorageResult;
42use risingwave_storage::hummock::HummockStorage;
43use risingwave_storage::hummock::backup_reader::BackupReader;
44use risingwave_storage::hummock::event_handler::HummockVersionUpdate;
45use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
46use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
47use risingwave_storage::hummock::observer_manager::HummockObserverNode;
48use risingwave_storage::hummock::test_utils::*;
49use risingwave_storage::hummock::write_limiter::WriteLimiter;
50use risingwave_storage::storage_value::StorageValue;
51use risingwave_storage::store::*;
52use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
53
54use crate::mock_notification_client::get_notification_client_for_test;
55
56pub async fn prepare_first_valid_version(
57    env: MetaSrvEnv,
58    hummock_manager_ref: HummockManagerRef,
59    cluster_controller_ref: ClusterControllerRef,
60    worker_id: i32,
61) -> (
62    PinnedVersion,
63    UnboundedSender<HummockVersionUpdate>,
64    UnboundedReceiver<HummockVersionUpdate>,
65) {
66    let (tx, mut rx) = unbounded_channel();
67    let notification_client = get_notification_client_for_test(
68        env,
69        hummock_manager_ref.clone(),
70        cluster_controller_ref,
71        worker_id,
72    )
73    .await;
74    let backup_manager = BackupReader::unused().await;
75    let write_limiter = WriteLimiter::unused();
76    let observer_manager = ObserverManager::new(
77        notification_client,
78        HummockObserverNode::new(
79            Arc::new(CompactionCatalogManager::default()),
80            backup_manager,
81            tx.clone(),
82            write_limiter,
83        ),
84    )
85    .await;
86    observer_manager.start().await;
87    let hummock_version = match rx.recv().await {
88        Some(HummockVersionUpdate::PinnedVersion(version)) => version,
89        _ => unreachable!("should be full version"),
90    };
91
92    (
93        PinnedVersion::new(*hummock_version, unbounded_channel().0),
94        tx,
95        rx,
96    )
97}
98
99#[async_trait::async_trait]
100pub trait TestIngestBatch: LocalStateStore {
101    async fn ingest_batch(
102        &mut self,
103        kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
104    ) -> StorageResult<usize>;
105}
106
107#[async_trait::async_trait]
108impl<S: LocalStateStore> TestIngestBatch for S {
109    async fn ingest_batch(
110        &mut self,
111        kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
112    ) -> StorageResult<usize> {
113        for (key, value) in kv_pairs {
114            match value.user_value {
115                None => self.delete(key, Bytes::new())?,
116                Some(value) => self.insert(key, value, None)?,
117            }
118        }
119        self.flush().await
120    }
121}
122
123pub async fn with_hummock_storage(
124    table_id: TableId,
125) -> (HummockStorage, Arc<MockHummockMetaClient>) {
126    let sstable_store = mock_sstable_store().await;
127    let hummock_options = Arc::new(default_opts_for_test());
128    let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
129    let meta_client = Arc::new(MockHummockMetaClient::new(
130        hummock_manager_ref.clone(),
131        worker_id as _,
132    ));
133
134    let hummock_storage = HummockStorage::for_test(
135        hummock_options,
136        sstable_store,
137        meta_client.clone(),
138        get_notification_client_for_test(
139            env,
140            hummock_manager_ref.clone(),
141            cluster_ctl_ref,
142            worker_id as _,
143        )
144        .await,
145    )
146    .await
147    .unwrap();
148
149    register_tables_with_id_for_test(
150        hummock_storage.compaction_catalog_manager_ref(),
151        &hummock_manager_ref,
152        &[table_id.table_id()],
153    )
154    .await;
155
156    (hummock_storage, meta_client)
157}
158
159pub fn update_filter_key_extractor_for_table_ids(
160    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
161    table_ids: &[u32],
162) {
163    for table_id in table_ids {
164        let mock_table = PbTable {
165            id: *table_id,
166            read_prefix_len_hint: 0,
167            maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as u32),
168            ..Default::default()
169        };
170        compaction_catalog_manager_ref.update(*table_id, mock_table);
171    }
172}
173
174pub async fn register_tables_with_id_for_test(
175    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
176    hummock_manager_ref: &HummockManagerRef,
177    table_ids: &[u32],
178) {
179    update_filter_key_extractor_for_table_ids(compaction_catalog_manager_ref, table_ids);
180    register_table_ids_to_compaction_group(
181        hummock_manager_ref,
182        table_ids,
183        StaticCompactionGroupId::StateDefault.into(),
184    )
185    .await;
186}
187
188pub fn update_filter_key_extractor_for_tables(
189    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
190    tables: &[PbTable],
191) {
192    for table in tables {
193        compaction_catalog_manager_ref.update(table.id, table.clone())
194    }
195}
196pub async fn register_tables_with_catalog_for_test(
197    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
198    hummock_manager_ref: &HummockManagerRef,
199    tables: &[Table],
200) {
201    update_filter_key_extractor_for_tables(compaction_catalog_manager_ref, tables);
202    let table_ids = tables.iter().map(|t| t.id).collect_vec();
203    register_table_ids_to_compaction_group(
204        hummock_manager_ref,
205        &table_ids,
206        StaticCompactionGroupId::StateDefault.into(),
207    )
208    .await;
209}
210
211pub struct HummockTestEnv {
212    pub storage: HummockStorage,
213    pub manager: HummockManagerRef,
214    pub meta_client: Arc<MockHummockMetaClient>,
215}
216
217impl HummockTestEnv {
218    async fn wait_version_sync(&self) {
219        self.storage
220            .wait_version(self.manager.get_current_version().await)
221            .await
222    }
223
224    pub async fn register_table_id(&self, table_id: TableId) {
225        register_tables_with_id_for_test(
226            self.storage.compaction_catalog_manager_ref(),
227            &self.manager,
228            &[table_id.table_id()],
229        )
230        .await;
231        self.wait_version_sync().await;
232    }
233
234    pub async fn register_vector_index(
235        &self,
236        table_id: TableId,
237        init_epoch: u64,
238        init_config: PbVectorIndexInit,
239    ) {
240        self.manager
241            .commit_epoch(CommitEpochInfo {
242                sstables: vec![],
243                new_table_watermarks: Default::default(),
244                sst_to_context: Default::default(),
245                new_table_fragment_infos: vec![NewTableFragmentInfo {
246                    table_ids: HashSet::from_iter([table_id]),
247                }],
248                change_log_delta: Default::default(),
249                vector_index_delta: HashMap::from_iter([(
250                    table_id,
251                    VectorIndexDelta::Init(init_config),
252                )]),
253                tables_to_commit: HashMap::from_iter([(table_id, init_epoch)]),
254            })
255            .await
256            .unwrap();
257    }
258
259    pub async fn register_table(&self, table: PbTable) {
260        register_tables_with_catalog_for_test(
261            self.storage.compaction_catalog_manager_ref(),
262            &self.manager,
263            &[table],
264        )
265        .await;
266        self.wait_version_sync().await;
267    }
268
269    // Seal, sync and commit a epoch.
270    // On completion of this function call, the provided epoch should be committed and visible.
271    pub async fn commit_epoch(&self, epoch: u64) {
272        let table_ids = self
273            .manager
274            .get_current_version()
275            .await
276            .state_table_info
277            .info()
278            .keys()
279            .cloned()
280            .collect();
281        let res = self
282            .storage
283            .seal_and_sync_epoch(epoch, table_ids)
284            .await
285            .unwrap();
286        self.meta_client.commit_epoch(epoch, res).await.unwrap();
287
288        self.wait_sync_committed_version().await;
289    }
290
291    pub async fn wait_sync_committed_version(&self) {
292        let version = self.manager.get_current_version().await;
293        self.storage.wait_version(version).await;
294    }
295}
296
297pub async fn prepare_hummock_test_env() -> HummockTestEnv {
298    let sstable_store = mock_sstable_store().await;
299    let hummock_options = Arc::new(default_opts_for_test());
300    let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
301
302    let hummock_meta_client = Arc::new(MockHummockMetaClient::new(
303        hummock_manager_ref.clone(),
304        worker_id as _,
305    ));
306
307    let notification_client = get_notification_client_for_test(
308        env,
309        hummock_manager_ref.clone(),
310        cluster_ctl_ref,
311        worker_id as _,
312    )
313    .await;
314
315    let storage = HummockStorage::for_test(
316        hummock_options,
317        sstable_store,
318        hummock_meta_client.clone(),
319        notification_client,
320    )
321    .await
322    .unwrap();
323
324    HummockTestEnv {
325        storage,
326        manager: hummock_manager_ref,
327        meta_client: hummock_meta_client,
328    }
329}