risingwave_hummock_test/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use bytes::Bytes;
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::id::WorkerId;
23use risingwave_common_service::ObserverManager;
24use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
25use risingwave_hummock_sdk::key::TableKey;
26pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str};
27use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
28use risingwave_meta::controller::cluster::ClusterControllerRef;
29use risingwave_meta::hummock::test_utils::{
30    register_table_ids_to_compaction_group, setup_compute_env,
31};
32use risingwave_meta::hummock::{
33    CommitEpochInfo, HummockManagerRef, MockHummockMetaClient, NewTableFragmentInfo,
34};
35use risingwave_meta::manager::MetaSrvEnv;
36use risingwave_pb::catalog::{PbTable, Table};
37use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
38use risingwave_rpc_client::HummockMetaClient;
39use risingwave_storage::compaction_catalog_manager::{
40    CompactionCatalogManager, CompactionCatalogManagerRef,
41};
42use risingwave_storage::error::StorageResult;
43use risingwave_storage::hummock::HummockStorage;
44use risingwave_storage::hummock::backup_reader::BackupReader;
45use risingwave_storage::hummock::event_handler::HummockVersionUpdate;
46use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
47use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
48use risingwave_storage::hummock::observer_manager::HummockObserverNode;
49use risingwave_storage::hummock::test_utils::*;
50use risingwave_storage::hummock::write_limiter::WriteLimiter;
51use risingwave_storage::storage_value::StorageValue;
52use risingwave_storage::store::*;
53use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
54
55use crate::mock_notification_client::get_notification_client_for_test;
56
57pub async fn prepare_first_valid_version(
58    env: MetaSrvEnv,
59    hummock_manager_ref: HummockManagerRef,
60    cluster_controller_ref: ClusterControllerRef,
61    worker_id: WorkerId,
62) -> (
63    PinnedVersion,
64    UnboundedSender<HummockVersionUpdate>,
65    UnboundedReceiver<HummockVersionUpdate>,
66) {
67    let (tx, mut rx) = unbounded_channel();
68    let notification_client = get_notification_client_for_test(
69        env,
70        hummock_manager_ref.clone(),
71        cluster_controller_ref,
72        worker_id,
73    )
74    .await;
75    let backup_manager = BackupReader::unused().await;
76    let write_limiter = WriteLimiter::unused();
77    let observer_manager = ObserverManager::new(
78        notification_client,
79        HummockObserverNode::new(
80            Arc::new(CompactionCatalogManager::default()),
81            backup_manager,
82            tx.clone(),
83            write_limiter,
84        ),
85    )
86    .await;
87    observer_manager.start().await;
88    let hummock_version = match rx.recv().await {
89        Some(HummockVersionUpdate::PinnedVersion(version)) => version,
90        _ => unreachable!("should be full version"),
91    };
92
93    (
94        PinnedVersion::new(*hummock_version, unbounded_channel().0),
95        tx,
96        rx,
97    )
98}
99
100#[async_trait::async_trait]
101pub trait TestIngestBatch: LocalStateStore {
102    async fn ingest_batch(
103        &mut self,
104        kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
105    ) -> StorageResult<usize>;
106}
107
108#[async_trait::async_trait]
109impl<S: LocalStateStore> TestIngestBatch for S {
110    async fn ingest_batch(
111        &mut self,
112        kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
113    ) -> StorageResult<usize> {
114        for (key, value) in kv_pairs {
115            match value.user_value {
116                None => self.delete(key, Bytes::new())?,
117                Some(value) => self.insert(key, value, None)?,
118            }
119        }
120        self.flush().await
121    }
122}
123
124pub async fn with_hummock_storage(
125    table_id: TableId,
126) -> (HummockStorage, Arc<MockHummockMetaClient>) {
127    let sstable_store = mock_sstable_store().await;
128    let hummock_options = Arc::new(default_opts_for_test());
129    let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
130    let meta_client = Arc::new(MockHummockMetaClient::new(
131        hummock_manager_ref.clone(),
132        worker_id as _,
133    ));
134
135    let hummock_storage = HummockStorage::for_test(
136        hummock_options,
137        sstable_store,
138        meta_client.clone(),
139        get_notification_client_for_test(
140            env,
141            hummock_manager_ref.clone(),
142            cluster_ctl_ref,
143            worker_id,
144        )
145        .await,
146    )
147    .await
148    .unwrap();
149
150    register_tables_with_id_for_test(
151        hummock_storage.compaction_catalog_manager_ref(),
152        &hummock_manager_ref,
153        &[table_id],
154    )
155    .await;
156
157    (hummock_storage, meta_client)
158}
159
160pub fn update_filter_key_extractor_for_table_ids(
161    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
162    table_ids: &[TableId],
163) {
164    for table_id in table_ids {
165        let mock_table = PbTable {
166            id: *table_id,
167            read_prefix_len_hint: 0,
168            maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as u32),
169            ..Default::default()
170        };
171        compaction_catalog_manager_ref.update(*table_id, mock_table);
172    }
173}
174
175pub async fn register_tables_with_id_for_test(
176    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
177    hummock_manager_ref: &HummockManagerRef,
178    table_ids: &[TableId],
179) {
180    update_filter_key_extractor_for_table_ids(compaction_catalog_manager_ref, table_ids);
181    register_table_ids_to_compaction_group(
182        hummock_manager_ref,
183        table_ids,
184        StaticCompactionGroupId::StateDefault.into(),
185    )
186    .await;
187}
188
189pub fn update_filter_key_extractor_for_tables(
190    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
191    tables: &[PbTable],
192) {
193    for table in tables {
194        compaction_catalog_manager_ref.update(table.id, table.clone())
195    }
196}
197pub async fn register_tables_with_catalog_for_test(
198    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
199    hummock_manager_ref: &HummockManagerRef,
200    tables: &[Table],
201) {
202    update_filter_key_extractor_for_tables(compaction_catalog_manager_ref, tables);
203    let table_ids = tables.iter().map(|t| t.id).collect_vec();
204    register_table_ids_to_compaction_group(
205        hummock_manager_ref,
206        &table_ids,
207        StaticCompactionGroupId::StateDefault.into(),
208    )
209    .await;
210}
211
212pub struct HummockTestEnv {
213    pub storage: HummockStorage,
214    pub manager: HummockManagerRef,
215    pub meta_client: Arc<MockHummockMetaClient>,
216}
217
218impl HummockTestEnv {
219    async fn wait_version_sync(&self) {
220        self.storage
221            .wait_version(self.manager.get_current_version().await)
222            .await
223    }
224
225    pub async fn register_table_id(&self, table_id: TableId) {
226        register_tables_with_id_for_test(
227            self.storage.compaction_catalog_manager_ref(),
228            &self.manager,
229            &[table_id],
230        )
231        .await;
232        self.wait_version_sync().await;
233    }
234
235    pub async fn register_vector_index(
236        &self,
237        table_id: TableId,
238        init_epoch: u64,
239        init_config: PbVectorIndexInit,
240    ) {
241        self.manager
242            .commit_epoch(CommitEpochInfo {
243                sstables: vec![],
244                new_table_watermarks: Default::default(),
245                sst_to_context: Default::default(),
246                new_table_fragment_infos: vec![NewTableFragmentInfo {
247                    table_ids: HashSet::from_iter([table_id]),
248                }],
249                change_log_delta: Default::default(),
250                vector_index_delta: HashMap::from_iter([(
251                    table_id,
252                    VectorIndexDelta::Init(init_config),
253                )]),
254                tables_to_commit: HashMap::from_iter([(table_id, init_epoch)]),
255                truncate_tables: HashSet::new(),
256            })
257            .await
258            .unwrap();
259    }
260
261    pub async fn register_table(&self, table: PbTable) {
262        register_tables_with_catalog_for_test(
263            self.storage.compaction_catalog_manager_ref(),
264            &self.manager,
265            &[table],
266        )
267        .await;
268        self.wait_version_sync().await;
269    }
270
271    // Seal, sync and commit a epoch.
272    // On completion of this function call, the provided epoch should be committed and visible.
273    pub async fn commit_epoch(&self, epoch: u64) {
274        let table_ids = self
275            .manager
276            .get_current_version()
277            .await
278            .state_table_info
279            .info()
280            .keys()
281            .cloned()
282            .collect();
283        let res = self
284            .storage
285            .seal_and_sync_epoch(epoch, table_ids)
286            .await
287            .unwrap();
288        self.meta_client.commit_epoch(epoch, res).await.unwrap();
289
290        self.wait_sync_committed_version().await;
291    }
292
293    pub async fn wait_sync_committed_version(&self) {
294        let version = self.manager.get_current_version().await;
295        self.storage.wait_version(version).await;
296    }
297}
298
299pub async fn prepare_hummock_test_env() -> HummockTestEnv {
300    let sstable_store = mock_sstable_store().await;
301    let hummock_options = Arc::new(default_opts_for_test());
302    let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
303
304    let hummock_meta_client = Arc::new(MockHummockMetaClient::new(
305        hummock_manager_ref.clone(),
306        worker_id as _,
307    ));
308
309    let notification_client = get_notification_client_for_test(
310        env,
311        hummock_manager_ref.clone(),
312        cluster_ctl_ref,
313        worker_id,
314    )
315    .await;
316
317    let storage = HummockStorage::for_test(
318        hummock_options,
319        sstable_store,
320        hummock_meta_client.clone(),
321        notification_client,
322    )
323    .await
324    .unwrap();
325
326    HummockTestEnv {
327        storage,
328        manager: hummock_manager_ref,
329        meta_client: hummock_meta_client,
330    }
331}