risingwave_hummock_test/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use bytes::Bytes;
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::id::WorkerId;
23use risingwave_common_service::ObserverManager;
24use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
25use risingwave_hummock_sdk::key::TableKey;
26pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str};
27use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
28use risingwave_meta::controller::cluster::ClusterControllerRef;
29use risingwave_meta::hummock::test_utils::{
30    register_table_ids_to_compaction_group, setup_compute_env,
31};
32use risingwave_meta::hummock::{
33    CommitEpochInfo, HummockManagerRef, MockHummockMetaClient, NewTableFragmentInfo,
34};
35use risingwave_meta::manager::MetaSrvEnv;
36use risingwave_pb::catalog::{PbTable, Table};
37use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
38use risingwave_rpc_client::HummockMetaClient;
39use risingwave_storage::compaction_catalog_manager::{
40    CompactionCatalogManager, CompactionCatalogManagerRef,
41};
42use risingwave_storage::error::StorageResult;
43use risingwave_storage::hummock::HummockStorage;
44use risingwave_storage::hummock::backup_reader::BackupReader;
45use risingwave_storage::hummock::event_handler::HummockVersionUpdate;
46use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
47use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
48use risingwave_storage::hummock::observer_manager::HummockObserverNode;
49use risingwave_storage::hummock::test_utils::*;
50use risingwave_storage::hummock::write_limiter::WriteLimiter;
51use risingwave_storage::storage_value::StorageValue;
52use risingwave_storage::store::*;
53use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
54
55use crate::mock_notification_client::get_notification_client_for_test;
56
57pub async fn prepare_first_valid_version(
58    env: MetaSrvEnv,
59    hummock_manager_ref: HummockManagerRef,
60    cluster_controller_ref: ClusterControllerRef,
61    worker_id: WorkerId,
62) -> (
63    PinnedVersion,
64    UnboundedSender<HummockVersionUpdate>,
65    UnboundedReceiver<HummockVersionUpdate>,
66) {
67    let (tx, mut rx) = unbounded_channel();
68    let notification_client = get_notification_client_for_test(
69        env,
70        hummock_manager_ref.clone(),
71        cluster_controller_ref,
72        worker_id,
73    )
74    .await;
75    let backup_manager = BackupReader::unused().await;
76    let write_limiter = WriteLimiter::unused();
77    let observer_manager = ObserverManager::new(
78        notification_client,
79        HummockObserverNode::new(
80            Arc::new(CompactionCatalogManager::default()),
81            backup_manager,
82            tx.clone(),
83            write_limiter,
84        ),
85    )
86    .await;
87    observer_manager.start().await;
88    let hummock_version = match rx.recv().await {
89        Some(HummockVersionUpdate::PinnedVersion(version)) => version,
90        _ => unreachable!("should be full version"),
91    };
92
93    (
94        PinnedVersion::new(*hummock_version, unbounded_channel().0),
95        tx,
96        rx,
97    )
98}
99
100#[async_trait::async_trait]
101pub trait TestIngestBatch: LocalStateStore {
102    async fn ingest_batch(
103        &mut self,
104        kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
105    ) -> StorageResult<usize>;
106}
107
108#[async_trait::async_trait]
109impl<S: LocalStateStore> TestIngestBatch for S {
110    async fn ingest_batch(
111        &mut self,
112        kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
113    ) -> StorageResult<usize> {
114        for (key, value) in kv_pairs {
115            match value.user_value {
116                None => self.delete(key, Bytes::new())?,
117                Some(value) => self.insert(key, value, None)?,
118            }
119        }
120        self.flush().await
121    }
122}
123
124pub async fn with_hummock_storage(
125    table_id: TableId,
126) -> (HummockStorage, Arc<MockHummockMetaClient>) {
127    let sstable_store = mock_sstable_store().await;
128    let hummock_options = Arc::new(default_opts_for_test());
129    let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
130    let meta_client = Arc::new(MockHummockMetaClient::new(
131        hummock_manager_ref.clone(),
132        worker_id as _,
133    ));
134
135    let hummock_storage = HummockStorage::for_test(
136        hummock_options,
137        sstable_store,
138        meta_client.clone(),
139        get_notification_client_for_test(
140            env,
141            hummock_manager_ref.clone(),
142            cluster_ctl_ref,
143            worker_id,
144        )
145        .await,
146    )
147    .await
148    .unwrap();
149
150    register_tables_with_id_for_test(
151        hummock_storage.compaction_catalog_manager_ref(),
152        &hummock_manager_ref,
153        &[table_id],
154    )
155    .await;
156
157    (hummock_storage, meta_client)
158}
159
160pub fn update_filter_key_extractor_for_table_ids(
161    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
162    table_ids: &[TableId],
163) {
164    for table_id in table_ids {
165        let mock_table = PbTable {
166            id: *table_id,
167            // The low-level hummock tests register table IDs without a real table catalog, but
168            // some of them still pass full-key prefix hints explicitly. Use an invalid prefix
169            // length to keep the legacy full-key filter extractor in these tests.
170            read_prefix_len_hint: 1,
171            maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as u32),
172            ..Default::default()
173        };
174        compaction_catalog_manager_ref.update(*table_id, mock_table);
175    }
176}
177
178pub async fn register_tables_with_id_for_test(
179    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
180    hummock_manager_ref: &HummockManagerRef,
181    table_ids: &[TableId],
182) {
183    update_filter_key_extractor_for_table_ids(compaction_catalog_manager_ref, table_ids);
184    register_table_ids_to_compaction_group(
185        hummock_manager_ref,
186        table_ids,
187        StaticCompactionGroupId::StateDefault,
188    )
189    .await;
190}
191
192pub fn update_filter_key_extractor_for_tables(
193    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
194    tables: &[PbTable],
195) {
196    for table in tables {
197        compaction_catalog_manager_ref.update(table.id, table.clone())
198    }
199}
200pub async fn register_tables_with_catalog_for_test(
201    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
202    hummock_manager_ref: &HummockManagerRef,
203    tables: &[Table],
204) {
205    update_filter_key_extractor_for_tables(compaction_catalog_manager_ref, tables);
206    let table_ids = tables.iter().map(|t| t.id).collect_vec();
207    register_table_ids_to_compaction_group(
208        hummock_manager_ref,
209        &table_ids,
210        StaticCompactionGroupId::StateDefault,
211    )
212    .await;
213}
214
215pub struct HummockTestEnv {
216    pub storage: HummockStorage,
217    pub manager: HummockManagerRef,
218    pub meta_client: Arc<MockHummockMetaClient>,
219}
220
221impl HummockTestEnv {
222    async fn wait_version_sync(&self) {
223        self.storage
224            .wait_version(self.manager.get_current_version().await)
225            .await
226    }
227
228    pub async fn register_table_id(&self, table_id: TableId) {
229        register_tables_with_id_for_test(
230            self.storage.compaction_catalog_manager_ref(),
231            &self.manager,
232            &[table_id],
233        )
234        .await;
235        self.wait_version_sync().await;
236    }
237
238    pub async fn register_vector_index(
239        &self,
240        table_id: TableId,
241        init_epoch: u64,
242        init_config: PbVectorIndexInit,
243    ) {
244        self.manager
245            .commit_epoch(CommitEpochInfo {
246                sstables: vec![],
247                new_table_watermarks: Default::default(),
248                sst_to_context: Default::default(),
249                new_table_fragment_infos: vec![NewTableFragmentInfo {
250                    table_ids: HashSet::from_iter([table_id]),
251                }],
252                change_log_delta: Default::default(),
253                vector_index_delta: HashMap::from_iter([(
254                    table_id,
255                    VectorIndexDelta::Init(init_config),
256                )]),
257                tables_to_commit: HashMap::from_iter([(table_id, init_epoch)]),
258                truncate_tables: HashSet::new(),
259            })
260            .await
261            .unwrap();
262    }
263
264    pub async fn register_table(&self, table: PbTable) {
265        register_tables_with_catalog_for_test(
266            self.storage.compaction_catalog_manager_ref(),
267            &self.manager,
268            &[table],
269        )
270        .await;
271        self.wait_version_sync().await;
272    }
273
274    // Seal, sync and commit a epoch.
275    // On completion of this function call, the provided epoch should be committed and visible.
276    pub async fn commit_epoch(&self, epoch: u64) {
277        let table_ids = self
278            .manager
279            .get_current_version()
280            .await
281            .state_table_info
282            .info()
283            .keys()
284            .cloned()
285            .collect();
286        let res = self
287            .storage
288            .seal_and_sync_epoch(epoch, table_ids)
289            .await
290            .unwrap();
291        self.meta_client.commit_epoch(epoch, res).await.unwrap();
292
293        self.wait_sync_committed_version().await;
294    }
295
296    pub async fn wait_sync_committed_version(&self) {
297        let version = self.manager.get_current_version().await;
298        self.storage.wait_version(version).await;
299    }
300}
301
302pub async fn prepare_hummock_test_env() -> HummockTestEnv {
303    let sstable_store = mock_sstable_store().await;
304    let hummock_options = Arc::new(default_opts_for_test());
305    let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
306
307    let hummock_meta_client = Arc::new(MockHummockMetaClient::new(
308        hummock_manager_ref.clone(),
309        worker_id as _,
310    ));
311
312    let notification_client = get_notification_client_for_test(
313        env,
314        hummock_manager_ref.clone(),
315        cluster_ctl_ref,
316        worker_id,
317    )
318    .await;
319
320    let storage = HummockStorage::for_test(
321        hummock_options,
322        sstable_store,
323        hummock_meta_client.clone(),
324        notification_client,
325    )
326    .await
327    .unwrap();
328
329    HummockTestEnv {
330        storage,
331        manager: hummock_manager_ref,
332        meta_client: hummock_meta_client,
333    }
334}