risingwave_hummock_test/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common_service::ObserverManager;
22use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
23use risingwave_hummock_sdk::key::TableKey;
24pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str};
25use risingwave_meta::controller::cluster::ClusterControllerRef;
26use risingwave_meta::hummock::test_utils::{
27    register_table_ids_to_compaction_group, setup_compute_env,
28};
29use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient};
30use risingwave_meta::manager::MetaSrvEnv;
31use risingwave_pb::catalog::{PbTable, Table};
32use risingwave_rpc_client::HummockMetaClient;
33use risingwave_storage::compaction_catalog_manager::{
34    CompactionCatalogManager, CompactionCatalogManagerRef,
35};
36use risingwave_storage::error::StorageResult;
37use risingwave_storage::hummock::HummockStorage;
38use risingwave_storage::hummock::backup_reader::BackupReader;
39use risingwave_storage::hummock::event_handler::HummockVersionUpdate;
40use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
41use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
42use risingwave_storage::hummock::observer_manager::HummockObserverNode;
43use risingwave_storage::hummock::test_utils::*;
44use risingwave_storage::hummock::write_limiter::WriteLimiter;
45use risingwave_storage::storage_value::StorageValue;
46use risingwave_storage::store::*;
47use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
48
49use crate::mock_notification_client::get_notification_client_for_test;
50
51pub async fn prepare_first_valid_version(
52    env: MetaSrvEnv,
53    hummock_manager_ref: HummockManagerRef,
54    cluster_controller_ref: ClusterControllerRef,
55    worker_id: i32,
56) -> (
57    PinnedVersion,
58    UnboundedSender<HummockVersionUpdate>,
59    UnboundedReceiver<HummockVersionUpdate>,
60) {
61    let (tx, mut rx) = unbounded_channel();
62    let notification_client = get_notification_client_for_test(
63        env,
64        hummock_manager_ref.clone(),
65        cluster_controller_ref,
66        worker_id,
67    )
68    .await;
69    let backup_manager = BackupReader::unused().await;
70    let write_limiter = WriteLimiter::unused();
71    let observer_manager = ObserverManager::new(
72        notification_client,
73        HummockObserverNode::new(
74            Arc::new(CompactionCatalogManager::default()),
75            backup_manager,
76            tx.clone(),
77            write_limiter,
78        ),
79    )
80    .await;
81    observer_manager.start().await;
82    let hummock_version = match rx.recv().await {
83        Some(HummockVersionUpdate::PinnedVersion(version)) => version,
84        _ => unreachable!("should be full version"),
85    };
86
87    (
88        PinnedVersion::new(*hummock_version, unbounded_channel().0),
89        tx,
90        rx,
91    )
92}
93
94#[async_trait::async_trait]
95pub trait TestIngestBatch: LocalStateStore {
96    async fn ingest_batch(
97        &mut self,
98        kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
99        write_options: WriteOptions,
100    ) -> StorageResult<usize>;
101}
102
103#[async_trait::async_trait]
104impl<S: LocalStateStore> TestIngestBatch for S {
105    async fn ingest_batch(
106        &mut self,
107        kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
108        write_options: WriteOptions,
109    ) -> StorageResult<usize> {
110        assert_eq!(self.epoch(), write_options.epoch);
111        for (key, value) in kv_pairs {
112            match value.user_value {
113                None => self.delete(key, Bytes::new())?,
114                Some(value) => self.insert(key, value, None)?,
115            }
116        }
117        self.flush().await
118    }
119}
120
121pub async fn with_hummock_storage(
122    table_id: TableId,
123) -> (HummockStorage, Arc<MockHummockMetaClient>) {
124    let sstable_store = mock_sstable_store().await;
125    let hummock_options = Arc::new(default_opts_for_test());
126    let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
127    let meta_client = Arc::new(MockHummockMetaClient::new(
128        hummock_manager_ref.clone(),
129        worker_id as _,
130    ));
131
132    let hummock_storage = HummockStorage::for_test(
133        hummock_options,
134        sstable_store,
135        meta_client.clone(),
136        get_notification_client_for_test(
137            env,
138            hummock_manager_ref.clone(),
139            cluster_ctl_ref,
140            worker_id as _,
141        )
142        .await,
143    )
144    .await
145    .unwrap();
146
147    register_tables_with_id_for_test(
148        hummock_storage.compaction_catalog_manager_ref(),
149        &hummock_manager_ref,
150        &[table_id.table_id()],
151    )
152    .await;
153
154    (hummock_storage, meta_client)
155}
156
157pub fn update_filter_key_extractor_for_table_ids(
158    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
159    table_ids: &[u32],
160) {
161    for table_id in table_ids {
162        let mock_table = PbTable {
163            id: *table_id,
164            read_prefix_len_hint: 0,
165            maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as u32),
166            ..Default::default()
167        };
168        compaction_catalog_manager_ref.update(*table_id, mock_table);
169    }
170}
171
172pub async fn register_tables_with_id_for_test(
173    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
174    hummock_manager_ref: &HummockManagerRef,
175    table_ids: &[u32],
176) {
177    update_filter_key_extractor_for_table_ids(compaction_catalog_manager_ref, table_ids);
178    register_table_ids_to_compaction_group(
179        hummock_manager_ref,
180        table_ids,
181        StaticCompactionGroupId::StateDefault.into(),
182    )
183    .await;
184}
185
186pub fn update_filter_key_extractor_for_tables(
187    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
188    tables: &[PbTable],
189) {
190    for table in tables {
191        compaction_catalog_manager_ref.update(table.id, table.clone())
192    }
193}
194pub async fn register_tables_with_catalog_for_test(
195    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
196    hummock_manager_ref: &HummockManagerRef,
197    tables: &[Table],
198) {
199    update_filter_key_extractor_for_tables(compaction_catalog_manager_ref, tables);
200    let table_ids = tables.iter().map(|t| t.id).collect_vec();
201    register_table_ids_to_compaction_group(
202        hummock_manager_ref,
203        &table_ids,
204        StaticCompactionGroupId::StateDefault.into(),
205    )
206    .await;
207}
208
209pub struct HummockTestEnv {
210    pub storage: HummockStorage,
211    pub manager: HummockManagerRef,
212    pub meta_client: Arc<MockHummockMetaClient>,
213}
214
215impl HummockTestEnv {
216    async fn wait_version_sync(&self) {
217        self.storage
218            .wait_version(self.manager.get_current_version().await)
219            .await
220    }
221
222    pub async fn register_table_id(&self, table_id: TableId) {
223        register_tables_with_id_for_test(
224            self.storage.compaction_catalog_manager_ref(),
225            &self.manager,
226            &[table_id.table_id()],
227        )
228        .await;
229        self.wait_version_sync().await;
230    }
231
232    pub async fn register_table(&self, table: PbTable) {
233        register_tables_with_catalog_for_test(
234            self.storage.compaction_catalog_manager_ref(),
235            &self.manager,
236            &[table],
237        )
238        .await;
239        self.wait_version_sync().await;
240    }
241
242    // Seal, sync and commit a epoch.
243    // On completion of this function call, the provided epoch should be committed and visible.
244    pub async fn commit_epoch(&self, epoch: u64) {
245        let table_ids = self
246            .manager
247            .get_current_version()
248            .await
249            .state_table_info
250            .info()
251            .keys()
252            .cloned()
253            .collect();
254        let res = self
255            .storage
256            .seal_and_sync_epoch(epoch, table_ids)
257            .await
258            .unwrap();
259        self.meta_client.commit_epoch(epoch, res).await.unwrap();
260
261        self.wait_sync_committed_version().await;
262    }
263
264    pub async fn wait_sync_committed_version(&self) {
265        let version = self.manager.get_current_version().await;
266        self.storage.wait_version(version).await;
267    }
268}
269
270pub async fn prepare_hummock_test_env() -> HummockTestEnv {
271    let sstable_store = mock_sstable_store().await;
272    let hummock_options = Arc::new(default_opts_for_test());
273    let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
274
275    let hummock_meta_client = Arc::new(MockHummockMetaClient::new(
276        hummock_manager_ref.clone(),
277        worker_id as _,
278    ));
279
280    let notification_client = get_notification_client_for_test(
281        env,
282        hummock_manager_ref.clone(),
283        cluster_ctl_ref,
284        worker_id as _,
285    )
286    .await;
287
288    let storage = HummockStorage::for_test(
289        hummock_options,
290        sstable_store,
291        hummock_meta_client.clone(),
292        notification_client,
293    )
294    .await
295    .unwrap();
296
297    HummockTestEnv {
298        storage,
299        manager: hummock_manager_ref,
300        meta_client: hummock_meta_client,
301    }
302}