risingwave_meta/hummock/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![cfg(any(test, feature = "test"))]
16
17use std::collections::{BTreeSet, HashMap};
18use std::sync::Arc;
19use std::time::Duration;
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::util::epoch::test_epoch;
25use risingwave_hummock_sdk::key::key_with_epoch;
26use risingwave_hummock_sdk::key_range::KeyRange;
27use risingwave_hummock_sdk::level::Levels;
28use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
31use risingwave_hummock_sdk::{
32    CompactionGroupId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult,
33};
34use risingwave_meta_model::WorkerId;
35use risingwave_pb::common::worker_node::Property;
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::CompactionConfig;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use risingwave_rpc_client::HummockMetaClient;
40
41use crate::controller::catalog::CatalogController;
42use crate::controller::cluster::{ClusterController, ClusterControllerRef};
43use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
44use crate::hummock::compaction::selector::{LocalSelectorStatistic, default_compaction_selector};
45use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext};
46use crate::hummock::level_handler::LevelHandler;
47pub use crate::hummock::manager::CommitEpochInfo;
48use crate::hummock::model::CompactionGroup;
49use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
50use crate::manager::MetaSrvEnv;
51use crate::rpc::metrics::MetaMetrics;
52
53pub fn to_local_sstable_info(ssts: &[SstableInfo]) -> Vec<LocalSstableInfo> {
54    ssts.iter()
55        .map(|sst| LocalSstableInfo::for_test(sst.clone()))
56        .collect_vec()
57}
58
59// This function has 3 phases:
60// 1. add 3 ssts to
61// 2. trigger a compaction and replace the input from phase 1 with the 1 new sst
62// 3. add 1 new sst
63// Please make sure the function do what you want before using it.
64pub async fn add_test_tables(
65    hummock_manager: &HummockManager,
66    hummock_meta_client: Arc<dyn HummockMetaClient>,
67    compaction_group_id: CompactionGroupId,
68) -> Vec<Vec<SstableInfo>> {
69    // Increase version by 2.
70
71    use risingwave_common::util::epoch::EpochExt;
72
73    let mut epoch = test_epoch(1);
74    let sstable_ids = get_sst_ids(hummock_manager, 3).await;
75    let test_tables = generate_test_sstables_with_table_id(epoch, 1, sstable_ids);
76    register_sstable_infos_to_compaction_group(hummock_manager, &test_tables, compaction_group_id)
77        .await;
78    let test_local_tables = to_local_sstable_info(&test_tables);
79    hummock_meta_client
80        .commit_epoch(
81            epoch,
82            SyncResult {
83                uncommitted_ssts: test_local_tables,
84                ..Default::default()
85            },
86        )
87        .await
88        .unwrap();
89
90    // Simulate a compaction and increase version by 1.
91    let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
92    register_sstable_infos_to_compaction_group(
93        hummock_manager,
94        &test_tables_2,
95        compaction_group_id,
96    )
97    .await;
98    let mut compact_task = hummock_manager
99        .get_compact_task(compaction_group_id, &mut default_compaction_selector())
100        .await
101        .unwrap()
102        .unwrap();
103    assert_eq!(
104        compact_task
105            .input_ssts
106            .iter()
107            .map(|i| i.table_infos.len())
108            .sum::<usize>(),
109        3
110    );
111
112    compact_task.target_level = 6;
113    hummock_manager
114        .report_compact_task_for_test(
115            compact_task.task_id,
116            Some(compact_task),
117            TaskStatus::Success,
118            test_tables_2.clone(),
119            None,
120        )
121        .await
122        .unwrap();
123    // Increase version by 1.
124    epoch.inc_epoch();
125    let test_tables_3 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
126    register_sstable_infos_to_compaction_group(
127        hummock_manager,
128        &test_tables_3,
129        compaction_group_id,
130    )
131    .await;
132    let test_local_tables_3 = to_local_sstable_info(&test_tables_3);
133    hummock_meta_client
134        .commit_epoch(
135            epoch,
136            SyncResult {
137                uncommitted_ssts: test_local_tables_3,
138                ..Default::default()
139            },
140        )
141        .await
142        .unwrap();
143    vec![test_tables, test_tables_2, test_tables_3]
144}
145
146pub fn generate_test_sstables_with_table_id(
147    epoch: u64,
148    table_id: u32,
149    sst_ids: Vec<u64>,
150) -> Vec<SstableInfo> {
151    let mut sst_info = vec![];
152    for (i, sst_id) in sst_ids.into_iter().enumerate() {
153        let object_size = 2;
154        sst_info.push(
155            SstableInfoInner {
156                object_id: sst_id.into(),
157                sst_id: sst_id.into(),
158                key_range: KeyRange {
159                    left: Bytes::from(key_with_epoch(
160                        format!("{:03}\0\0_key_test_{:05}", table_id, i + 1)
161                            .as_bytes()
162                            .to_vec(),
163                        epoch,
164                    )),
165                    right: Bytes::from(key_with_epoch(
166                        format!("{:03}\0\0_key_test_{:05}", table_id, (i + 1) * 10)
167                            .as_bytes()
168                            .to_vec(),
169                        epoch,
170                    )),
171                    right_exclusive: false,
172                },
173                file_size: object_size,
174                table_ids: vec![table_id],
175                uncompressed_file_size: object_size,
176                max_epoch: epoch,
177                sst_size: object_size,
178                ..Default::default()
179            }
180            .into(),
181        );
182    }
183    sst_info
184}
185
186pub fn generate_test_tables(epoch: u64, sst_ids: Vec<u64>) -> Vec<SstableInfo> {
187    let mut sst_info = vec![];
188    for (i, sst_id) in sst_ids.into_iter().enumerate() {
189        let object_size = 2;
190        sst_info.push(
191            SstableInfoInner {
192                object_id: sst_id.into(),
193                sst_id: sst_id.into(),
194                key_range: KeyRange {
195                    left: Bytes::from(iterator_test_key_of_epoch(sst_id, i + 1, epoch)),
196                    right: Bytes::from(iterator_test_key_of_epoch(sst_id, (i + 1) * 10, epoch)),
197                    right_exclusive: false,
198                },
199                file_size: object_size,
200                table_ids: vec![sst_id as u32, sst_id as u32 * 10000],
201                uncompressed_file_size: object_size,
202                max_epoch: epoch,
203                sst_size: object_size,
204                ..Default::default()
205            }
206            .into(),
207        );
208    }
209    sst_info
210}
211
212pub async fn register_sstable_infos_to_compaction_group(
213    compaction_group_manager_ref: &HummockManager,
214    sstable_infos: &[SstableInfo],
215    compaction_group_id: CompactionGroupId,
216) {
217    let table_ids = sstable_infos
218        .iter()
219        .flat_map(|sstable_info| &sstable_info.table_ids)
220        .sorted()
221        .dedup()
222        .cloned()
223        .collect_vec();
224    register_table_ids_to_compaction_group(
225        compaction_group_manager_ref,
226        &table_ids,
227        compaction_group_id,
228    )
229    .await;
230}
231
232pub async fn register_table_ids_to_compaction_group(
233    hummock_manager_ref: &HummockManager,
234    table_ids: &[u32],
235    compaction_group_id: CompactionGroupId,
236) {
237    hummock_manager_ref
238        .register_table_ids_for_test(
239            &table_ids
240                .iter()
241                .map(|table_id| (*table_id, compaction_group_id))
242                .collect_vec(),
243        )
244        .await
245        .unwrap();
246}
247
248pub async fn unregister_table_ids_from_compaction_group(
249    hummock_manager_ref: &HummockManager,
250    table_ids: &[u32],
251) {
252    hummock_manager_ref
253        .unregister_table_ids(table_ids.iter().map(|table_id| TableId::new(*table_id)))
254        .await
255        .unwrap();
256}
257
258/// Generate keys like `001_key_test_00002` with timestamp `epoch`.
259pub fn iterator_test_key_of_epoch(table: u64, idx: usize, ts: HummockEpoch) -> Vec<u8> {
260    // key format: {prefix_index}_version
261    key_with_epoch(
262        format!("{:03}\0\0_key_test_{:05}", table, idx)
263            .as_bytes()
264            .to_vec(),
265        ts,
266    )
267}
268
269pub fn get_sorted_object_ids(sstables: &[SstableInfo]) -> Vec<HummockSstableObjectId> {
270    sstables
271        .iter()
272        .map(|table| table.object_id)
273        .sorted()
274        .collect_vec()
275}
276
277pub fn get_sorted_committed_object_ids(
278    hummock_version: &HummockVersion,
279    compaction_group_id: CompactionGroupId,
280) -> Vec<HummockSstableObjectId> {
281    let levels = match hummock_version.levels.get(&compaction_group_id) {
282        Some(levels) => levels,
283        None => return vec![],
284    };
285    levels
286        .levels
287        .iter()
288        .chain(levels.l0.sub_levels.iter())
289        .flat_map(|levels| levels.table_infos.iter().map(|info| info.object_id))
290        .sorted()
291        .collect_vec()
292}
293
294pub async fn setup_compute_env_with_config(
295    port: i32,
296    config: CompactionConfig,
297) -> (
298    MetaSrvEnv,
299    HummockManagerRef,
300    ClusterControllerRef,
301    WorkerId,
302) {
303    setup_compute_env_with_metric(port, config, None).await
304}
305
306pub async fn setup_compute_env_with_metric(
307    port: i32,
308    config: CompactionConfig,
309    meta_metric: Option<MetaMetrics>,
310) -> (
311    MetaSrvEnv,
312    HummockManagerRef,
313    ClusterControllerRef,
314    WorkerId,
315) {
316    let env = MetaSrvEnv::for_test().await;
317    let cluster_ctl = Arc::new(
318        ClusterController::new(env.clone(), Duration::from_secs(1))
319            .await
320            .unwrap(),
321    );
322    let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
323
324    let compactor_manager = Arc::new(CompactorManager::for_test());
325
326    let (compactor_streams_change_tx, _compactor_streams_change_rx) =
327        tokio::sync::mpsc::unbounded_channel();
328
329    let hummock_manager = HummockManager::with_config(
330        env.clone(),
331        cluster_ctl.clone(),
332        catalog_ctl,
333        Arc::new(meta_metric.unwrap_or_default()),
334        compactor_manager,
335        config,
336        compactor_streams_change_tx,
337    )
338    .await;
339
340    let fake_host_address = HostAddress {
341        host: "127.0.0.1".to_owned(),
342        port,
343    };
344    let fake_parallelism = 4;
345    let worker_id = cluster_ctl
346        .add_worker(
347            WorkerType::ComputeNode,
348            fake_host_address,
349            Property {
350                is_streaming: true,
351                is_serving: true,
352                is_unschedulable: false,
353                parallelism: fake_parallelism as _,
354                ..Default::default()
355            },
356            Default::default(),
357        )
358        .await
359        .unwrap();
360    (env, hummock_manager, cluster_ctl, worker_id)
361}
362
363pub async fn setup_compute_env(
364    port: i32,
365) -> (
366    MetaSrvEnv,
367    HummockManagerRef,
368    ClusterControllerRef,
369    WorkerId,
370) {
371    let config = CompactionConfigBuilder::new()
372        .level0_tier_compact_file_number(1)
373        .level0_max_compact_file_number(130)
374        .level0_sub_level_compact_level_count(1)
375        .level0_overlapping_sub_level_compact_level_count(1)
376        .build();
377    setup_compute_env_with_config(port, config).await
378}
379
380pub async fn get_sst_ids(hummock_manager: &HummockManager, number: u32) -> Vec<u64> {
381    let range = hummock_manager.get_new_object_ids(number).await.unwrap();
382    (range.start_id.inner()..range.end_id.inner()).collect_vec()
383}
384
385pub async fn add_ssts(
386    epoch: HummockEpoch,
387    hummock_manager: &HummockManager,
388    hummock_meta_client: Arc<dyn HummockMetaClient>,
389) -> Vec<SstableInfo> {
390    let table_ids = get_sst_ids(hummock_manager, 3).await;
391    let test_tables = generate_test_sstables_with_table_id(test_epoch(epoch), 1, table_ids);
392    let ssts = to_local_sstable_info(&test_tables);
393    hummock_meta_client
394        .commit_epoch(
395            epoch,
396            SyncResult {
397                uncommitted_ssts: ssts,
398                ..Default::default()
399            },
400        )
401        .await
402        .unwrap();
403    test_tables
404}
405
406pub fn compaction_selector_context<'a>(
407    group: &'a CompactionGroup,
408    levels: &'a Levels,
409    member_table_ids: &'a BTreeSet<TableId>,
410    level_handlers: &'a mut [LevelHandler],
411    selector_stats: &'a mut LocalSelectorStatistic,
412    table_id_to_options: &'a HashMap<u32, TableOption>,
413    developer_config: Arc<CompactionDeveloperConfig>,
414    table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
415    state_table_info: &'a HummockVersionStateTableInfo,
416) -> CompactionSelectorContext<'a> {
417    CompactionSelectorContext {
418        group,
419        levels,
420        member_table_ids,
421        level_handlers,
422        selector_stats,
423        table_id_to_options,
424        developer_config,
425        table_watermarks,
426        state_table_info,
427    }
428}
429
430pub async fn get_compaction_group_id_by_table_id(
431    hummock_manager_ref: HummockManagerRef,
432    table_id: u32,
433) -> u64 {
434    let version = hummock_manager_ref.get_current_version().await;
435    let mapping = version.state_table_info.build_table_compaction_group_id();
436    *mapping.get(&(table_id.into())).unwrap()
437}