risingwave_meta/hummock/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![cfg(any(test, feature = "test"))]
16
17use std::collections::{BTreeSet, HashMap};
18use std::sync::Arc;
19use std::time::Duration;
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::util::epoch::test_epoch;
25use risingwave_hummock_sdk::key::key_with_epoch;
26use risingwave_hummock_sdk::key_range::KeyRange;
27use risingwave_hummock_sdk::level::Levels;
28use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
31use risingwave_hummock_sdk::{
32    CompactionGroupId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult,
33};
34use risingwave_meta_model::WorkerId;
35use risingwave_pb::common::worker_node::Property;
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::CompactionConfig;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use risingwave_rpc_client::HummockMetaClient;
40
41use crate::controller::catalog::CatalogController;
42use crate::controller::cluster::{ClusterController, ClusterControllerRef};
43use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
44use crate::hummock::compaction::selector::{LocalSelectorStatistic, default_compaction_selector};
45use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext};
46use crate::hummock::level_handler::LevelHandler;
47pub use crate::hummock::manager::CommitEpochInfo;
48use crate::hummock::model::CompactionGroup;
49use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
50use crate::manager::MetaSrvEnv;
51use crate::rpc::metrics::MetaMetrics;
52
53pub fn to_local_sstable_info(ssts: &[SstableInfo]) -> Vec<LocalSstableInfo> {
54    ssts.iter()
55        .map(|sst| LocalSstableInfo::for_test(sst.clone()))
56        .collect_vec()
57}
58
59// This function has 3 phases:
60// 1. add 3 ssts to
61// 2. trigger a compaction and replace the input from phase 1 with the 1 new sst
62// 3. add 1 new sst
63// Please make sure the function do what you want before using it.
64pub async fn add_test_tables(
65    hummock_manager: &HummockManager,
66    hummock_meta_client: Arc<dyn HummockMetaClient>,
67    compaction_group_id: CompactionGroupId,
68) -> Vec<Vec<SstableInfo>> {
69    // Increase version by 2.
70
71    use risingwave_common::util::epoch::EpochExt;
72
73    let mut epoch = test_epoch(1);
74    let sstable_ids = get_sst_ids(hummock_manager, 3).await;
75    let test_tables = generate_test_sstables_with_table_id(epoch, 1, sstable_ids);
76    register_sstable_infos_to_compaction_group(hummock_manager, &test_tables, compaction_group_id)
77        .await;
78    let test_local_tables = to_local_sstable_info(&test_tables);
79    hummock_meta_client
80        .commit_epoch(
81            epoch,
82            SyncResult {
83                uncommitted_ssts: test_local_tables,
84                ..Default::default()
85            },
86        )
87        .await
88        .unwrap();
89
90    // Simulate a compaction and increase version by 1.
91    let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
92    register_sstable_infos_to_compaction_group(
93        hummock_manager,
94        &test_tables_2,
95        compaction_group_id,
96    )
97    .await;
98    let mut compact_task = hummock_manager
99        .get_compact_task(compaction_group_id, &mut default_compaction_selector())
100        .await
101        .unwrap()
102        .unwrap();
103    assert_eq!(
104        compact_task
105            .input_ssts
106            .iter()
107            .map(|i| i.table_infos.len())
108            .sum::<usize>(),
109        3
110    );
111
112    compact_task.target_level = 6;
113    hummock_manager
114        .report_compact_task_for_test(
115            compact_task.task_id,
116            Some(compact_task),
117            TaskStatus::Success,
118            test_tables_2.clone(),
119            None,
120        )
121        .await
122        .unwrap();
123    // Increase version by 1.
124    epoch.inc_epoch();
125    let test_tables_3 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
126    register_sstable_infos_to_compaction_group(
127        hummock_manager,
128        &test_tables_3,
129        compaction_group_id,
130    )
131    .await;
132    let test_local_tables_3 = to_local_sstable_info(&test_tables_3);
133    hummock_meta_client
134        .commit_epoch(
135            epoch,
136            SyncResult {
137                uncommitted_ssts: test_local_tables_3,
138                ..Default::default()
139            },
140        )
141        .await
142        .unwrap();
143    vec![test_tables, test_tables_2, test_tables_3]
144}
145
146pub fn generate_test_sstables_with_table_id(
147    epoch: u64,
148    table_id: impl Into<TableId>,
149    sst_ids: Vec<u64>,
150) -> Vec<SstableInfo> {
151    let table_id = table_id.into();
152    let mut sst_info = vec![];
153    for (i, sst_id) in sst_ids.into_iter().enumerate() {
154        let object_size = 2;
155        sst_info.push(
156            SstableInfoInner {
157                object_id: sst_id.into(),
158                sst_id: sst_id.into(),
159                key_range: KeyRange {
160                    left: Bytes::from(key_with_epoch(
161                        format!("{:03}\0\0_key_test_{:05}", table_id, i + 1)
162                            .as_bytes()
163                            .to_vec(),
164                        epoch,
165                    )),
166                    right: Bytes::from(key_with_epoch(
167                        format!("{:03}\0\0_key_test_{:05}", table_id, (i + 1) * 10)
168                            .as_bytes()
169                            .to_vec(),
170                        epoch,
171                    )),
172                    right_exclusive: false,
173                },
174                file_size: object_size,
175                table_ids: vec![table_id],
176                uncompressed_file_size: object_size,
177                max_epoch: epoch,
178                sst_size: object_size,
179                ..Default::default()
180            }
181            .into(),
182        );
183    }
184    sst_info
185}
186
187pub fn generate_test_tables(epoch: u64, sst_ids: Vec<u64>) -> Vec<SstableInfo> {
188    let mut sst_info = vec![];
189    for (i, sst_id) in sst_ids.into_iter().enumerate() {
190        let object_size = 2;
191        sst_info.push(
192            SstableInfoInner {
193                object_id: sst_id.into(),
194                sst_id: sst_id.into(),
195                key_range: KeyRange {
196                    left: Bytes::from(iterator_test_key_of_epoch(sst_id, i + 1, epoch)),
197                    right: Bytes::from(iterator_test_key_of_epoch(sst_id, (i + 1) * 10, epoch)),
198                    right_exclusive: false,
199                },
200                file_size: object_size,
201                table_ids: vec![(sst_id as u32).into(), (sst_id as u32 * 10000).into()],
202                uncompressed_file_size: object_size,
203                max_epoch: epoch,
204                sst_size: object_size,
205                ..Default::default()
206            }
207            .into(),
208        );
209    }
210    sst_info
211}
212
213pub async fn register_sstable_infos_to_compaction_group(
214    compaction_group_manager_ref: &HummockManager,
215    sstable_infos: &[SstableInfo],
216    compaction_group_id: CompactionGroupId,
217) {
218    let table_ids = sstable_infos
219        .iter()
220        .flat_map(|sstable_info| &sstable_info.table_ids)
221        .sorted()
222        .dedup()
223        .copied()
224        .collect_vec();
225    register_table_ids_to_compaction_group(
226        compaction_group_manager_ref,
227        &table_ids,
228        compaction_group_id,
229    )
230    .await;
231}
232
233pub async fn register_table_ids_to_compaction_group(
234    hummock_manager_ref: &HummockManager,
235    table_ids: &[impl Into<TableId> + Copy],
236    compaction_group_id: CompactionGroupId,
237) {
238    hummock_manager_ref
239        .register_table_ids_for_test(
240            &table_ids
241                .iter()
242                .map(|table_id| (*table_id, compaction_group_id))
243                .collect_vec(),
244        )
245        .await
246        .unwrap();
247}
248
249pub async fn unregister_table_ids_from_compaction_group(
250    hummock_manager_ref: &HummockManager,
251    table_ids: &[impl Into<TableId> + Copy],
252) {
253    hummock_manager_ref
254        .unregister_table_ids(table_ids.iter().map(|table_id| (*table_id).into()))
255        .await
256        .unwrap();
257}
258
259/// Generate keys like `001_key_test_00002` with timestamp `epoch`.
260pub fn iterator_test_key_of_epoch(table: u64, idx: usize, ts: HummockEpoch) -> Vec<u8> {
261    // key format: {prefix_index}_version
262    key_with_epoch(
263        format!("{:03}\0\0_key_test_{:05}", table, idx)
264            .as_bytes()
265            .to_vec(),
266        ts,
267    )
268}
269
270pub fn get_sorted_object_ids(sstables: &[SstableInfo]) -> Vec<HummockSstableObjectId> {
271    sstables
272        .iter()
273        .map(|table| table.object_id)
274        .sorted()
275        .collect_vec()
276}
277
278pub fn get_sorted_committed_object_ids(
279    hummock_version: &HummockVersion,
280    compaction_group_id: CompactionGroupId,
281) -> Vec<HummockSstableObjectId> {
282    let levels = match hummock_version.levels.get(&compaction_group_id) {
283        Some(levels) => levels,
284        None => return vec![],
285    };
286    levels
287        .levels
288        .iter()
289        .chain(levels.l0.sub_levels.iter())
290        .flat_map(|levels| levels.table_infos.iter().map(|info| info.object_id))
291        .sorted()
292        .collect_vec()
293}
294
295pub async fn setup_compute_env_with_config(
296    port: i32,
297    config: CompactionConfig,
298) -> (
299    MetaSrvEnv,
300    HummockManagerRef,
301    ClusterControllerRef,
302    WorkerId,
303) {
304    setup_compute_env_with_metric(port, config, None).await
305}
306
307pub async fn setup_compute_env_with_metric(
308    port: i32,
309    config: CompactionConfig,
310    meta_metric: Option<MetaMetrics>,
311) -> (
312    MetaSrvEnv,
313    HummockManagerRef,
314    ClusterControllerRef,
315    WorkerId,
316) {
317    let env = MetaSrvEnv::for_test().await;
318    let cluster_ctl = Arc::new(
319        ClusterController::new(env.clone(), Duration::from_secs(1))
320            .await
321            .unwrap(),
322    );
323    let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
324
325    let compactor_manager = Arc::new(CompactorManager::for_test());
326
327    let (compactor_streams_change_tx, _compactor_streams_change_rx) =
328        tokio::sync::mpsc::unbounded_channel();
329
330    let hummock_manager = HummockManager::with_config(
331        env.clone(),
332        cluster_ctl.clone(),
333        catalog_ctl,
334        Arc::new(meta_metric.unwrap_or_default()),
335        compactor_manager,
336        config,
337        compactor_streams_change_tx,
338    )
339    .await;
340
341    let fake_host_address = HostAddress {
342        host: "127.0.0.1".to_owned(),
343        port,
344    };
345    let fake_parallelism = 4;
346    let worker_id = cluster_ctl
347        .add_worker(
348            WorkerType::ComputeNode,
349            fake_host_address,
350            Property {
351                is_streaming: true,
352                is_serving: true,
353                is_unschedulable: false,
354                parallelism: fake_parallelism as _,
355                ..Default::default()
356            },
357            Default::default(),
358        )
359        .await
360        .unwrap();
361    (env, hummock_manager, cluster_ctl, worker_id)
362}
363
364pub async fn setup_compute_env(
365    port: i32,
366) -> (
367    MetaSrvEnv,
368    HummockManagerRef,
369    ClusterControllerRef,
370    WorkerId,
371) {
372    let config = CompactionConfigBuilder::new()
373        .level0_tier_compact_file_number(1)
374        .level0_max_compact_file_number(130)
375        .level0_sub_level_compact_level_count(1)
376        .level0_overlapping_sub_level_compact_level_count(1)
377        .build();
378    setup_compute_env_with_config(port, config).await
379}
380
381pub async fn get_sst_ids(hummock_manager: &HummockManager, number: u32) -> Vec<u64> {
382    let range = hummock_manager.get_new_object_ids(number).await.unwrap();
383    (range.start_id.inner()..range.end_id.inner()).collect_vec()
384}
385
386pub async fn add_ssts(
387    epoch: HummockEpoch,
388    hummock_manager: &HummockManager,
389    hummock_meta_client: Arc<dyn HummockMetaClient>,
390) -> Vec<SstableInfo> {
391    let table_ids = get_sst_ids(hummock_manager, 3).await;
392    let test_tables = generate_test_sstables_with_table_id(test_epoch(epoch), 1, table_ids);
393    let ssts = to_local_sstable_info(&test_tables);
394    hummock_meta_client
395        .commit_epoch(
396            epoch,
397            SyncResult {
398                uncommitted_ssts: ssts,
399                ..Default::default()
400            },
401        )
402        .await
403        .unwrap();
404    test_tables
405}
406
407pub fn compaction_selector_context<'a>(
408    group: &'a CompactionGroup,
409    levels: &'a Levels,
410    member_table_ids: &'a BTreeSet<TableId>,
411    level_handlers: &'a mut [LevelHandler],
412    selector_stats: &'a mut LocalSelectorStatistic,
413    table_id_to_options: &'a HashMap<TableId, TableOption>,
414    developer_config: Arc<CompactionDeveloperConfig>,
415    table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
416    state_table_info: &'a HummockVersionStateTableInfo,
417) -> CompactionSelectorContext<'a> {
418    CompactionSelectorContext {
419        group,
420        levels,
421        member_table_ids,
422        level_handlers,
423        selector_stats,
424        table_id_to_options,
425        developer_config,
426        table_watermarks,
427        state_table_info,
428    }
429}
430
431pub async fn get_compaction_group_id_by_table_id(
432    hummock_manager_ref: HummockManagerRef,
433    table_id: impl Into<TableId>,
434) -> u64 {
435    let version = hummock_manager_ref.get_current_version().await;
436    let mapping = version.state_table_info.build_table_compaction_group_id();
437    *mapping.get(&table_id.into()).unwrap()
438}