risingwave_meta/hummock/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![cfg(any(test, feature = "test"))]
16
17use std::collections::{BTreeSet, HashMap};
18use std::sync::Arc;
19use std::time::Duration;
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::util::epoch::test_epoch;
25use risingwave_hummock_sdk::key::key_with_epoch;
26use risingwave_hummock_sdk::key_range::KeyRange;
27use risingwave_hummock_sdk::level::Levels;
28use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
31use risingwave_hummock_sdk::{
32    CompactionGroupId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult,
33};
34use risingwave_meta_model::WorkerId;
35use risingwave_pb::common::worker_node::Property;
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::CompactionConfig;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use risingwave_rpc_client::HummockMetaClient;
40
41use crate::controller::catalog::CatalogController;
42use crate::controller::cluster::{ClusterController, ClusterControllerRef};
43use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
44use crate::hummock::compaction::selector::{LocalSelectorStatistic, default_compaction_selector};
45use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext};
46use crate::hummock::level_handler::LevelHandler;
47pub use crate::hummock::manager::CommitEpochInfo;
48use crate::hummock::model::CompactionGroup;
49use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
50use crate::manager::MetaSrvEnv;
51use crate::rpc::metrics::MetaMetrics;
52
53pub fn to_local_sstable_info(ssts: &[SstableInfo]) -> Vec<LocalSstableInfo> {
54    ssts.iter()
55        .map(|sst| LocalSstableInfo::for_test(sst.clone()))
56        .collect_vec()
57}
58
59// This function has 3 phases:
60// 1. add 3 ssts to
61// 2. trigger a compaction and replace the input from phase 1 with the 1 new sst
62// 3. add 1 new sst
63// Please make sure the function do what you want before using it.
64pub async fn add_test_tables(
65    hummock_manager: &HummockManager,
66    hummock_meta_client: Arc<dyn HummockMetaClient>,
67    compaction_group_id: CompactionGroupId,
68) -> Vec<Vec<SstableInfo>> {
69    // Increase version by 2.
70
71    use risingwave_common::util::epoch::EpochExt;
72
73    let mut epoch = test_epoch(1);
74    let sstable_ids = get_sst_ids(hummock_manager, 3).await;
75    let test_tables = generate_test_sstables_with_table_id(epoch, 1, sstable_ids);
76    register_sstable_infos_to_compaction_group(hummock_manager, &test_tables, compaction_group_id)
77        .await;
78    let test_local_tables = to_local_sstable_info(&test_tables);
79    hummock_meta_client
80        .commit_epoch(
81            epoch,
82            SyncResult {
83                uncommitted_ssts: test_local_tables,
84                ..Default::default()
85            },
86        )
87        .await
88        .unwrap();
89
90    // Simulate a compaction and increase version by 1.
91    let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
92    register_sstable_infos_to_compaction_group(
93        hummock_manager,
94        &test_tables_2,
95        compaction_group_id,
96    )
97    .await;
98    let mut compact_task = hummock_manager
99        .get_compact_task(compaction_group_id, &mut default_compaction_selector())
100        .await
101        .unwrap()
102        .unwrap();
103    assert_eq!(
104        compact_task
105            .input_ssts
106            .iter()
107            .map(|i| i.table_infos.len())
108            .sum::<usize>(),
109        3
110    );
111
112    compact_task.target_level = 6;
113    hummock_manager
114        .report_compact_task_for_test(
115            compact_task.task_id,
116            Some(compact_task),
117            TaskStatus::Success,
118            test_tables_2.clone(),
119            None,
120        )
121        .await
122        .unwrap();
123    // Increase version by 1.
124    epoch.inc_epoch();
125    let test_tables_3 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
126    register_sstable_infos_to_compaction_group(
127        hummock_manager,
128        &test_tables_3,
129        compaction_group_id,
130    )
131    .await;
132    let test_local_tables_3 = to_local_sstable_info(&test_tables_3);
133    hummock_meta_client
134        .commit_epoch(
135            epoch,
136            SyncResult {
137                uncommitted_ssts: test_local_tables_3,
138                ..Default::default()
139            },
140        )
141        .await
142        .unwrap();
143    vec![test_tables, test_tables_2, test_tables_3]
144}
145
146pub fn generate_test_sstables_with_table_id(
147    epoch: u64,
148    table_id: u32,
149    sst_ids: Vec<HummockSstableObjectId>,
150) -> Vec<SstableInfo> {
151    let mut sst_info = vec![];
152    for (i, sst_id) in sst_ids.into_iter().enumerate() {
153        let object_size = 2;
154        sst_info.push(
155            SstableInfoInner {
156                object_id: sst_id,
157                sst_id,
158                key_range: KeyRange {
159                    left: Bytes::from(key_with_epoch(
160                        format!("{:03}\0\0_key_test_{:05}", table_id, i + 1)
161                            .as_bytes()
162                            .to_vec(),
163                        epoch,
164                    )),
165                    right: Bytes::from(key_with_epoch(
166                        format!("{:03}\0\0_key_test_{:05}", table_id, (i + 1) * 10)
167                            .as_bytes()
168                            .to_vec(),
169                        epoch,
170                    )),
171                    right_exclusive: false,
172                },
173                file_size: object_size,
174                table_ids: vec![table_id],
175                uncompressed_file_size: object_size,
176                max_epoch: epoch,
177                sst_size: object_size,
178                ..Default::default()
179            }
180            .into(),
181        );
182    }
183    sst_info
184}
185
186pub fn generate_test_tables(epoch: u64, sst_ids: Vec<HummockSstableObjectId>) -> Vec<SstableInfo> {
187    let mut sst_info = vec![];
188    for (i, sst_id) in sst_ids.into_iter().enumerate() {
189        let object_size = 2;
190        sst_info.push(
191            SstableInfoInner {
192                object_id: sst_id,
193                sst_id,
194                key_range: KeyRange {
195                    left: Bytes::from(iterator_test_key_of_epoch(sst_id, i + 1, epoch)),
196                    right: Bytes::from(iterator_test_key_of_epoch(sst_id, (i + 1) * 10, epoch)),
197                    right_exclusive: false,
198                },
199                file_size: object_size,
200                table_ids: vec![sst_id as u32, sst_id as u32 * 10000],
201                uncompressed_file_size: object_size,
202                max_epoch: epoch,
203                sst_size: object_size,
204                ..Default::default()
205            }
206            .into(),
207        );
208    }
209    sst_info
210}
211
212pub async fn register_sstable_infos_to_compaction_group(
213    compaction_group_manager_ref: &HummockManager,
214    sstable_infos: &[SstableInfo],
215    compaction_group_id: CompactionGroupId,
216) {
217    let table_ids = sstable_infos
218        .iter()
219        .flat_map(|sstable_info| &sstable_info.table_ids)
220        .sorted()
221        .dedup()
222        .cloned()
223        .collect_vec();
224    register_table_ids_to_compaction_group(
225        compaction_group_manager_ref,
226        &table_ids,
227        compaction_group_id,
228    )
229    .await;
230}
231
232pub async fn register_table_ids_to_compaction_group(
233    hummock_manager_ref: &HummockManager,
234    table_ids: &[u32],
235    compaction_group_id: CompactionGroupId,
236) {
237    hummock_manager_ref
238        .register_table_ids_for_test(
239            &table_ids
240                .iter()
241                .map(|table_id| (*table_id, compaction_group_id))
242                .collect_vec(),
243        )
244        .await
245        .unwrap();
246}
247
248pub async fn unregister_table_ids_from_compaction_group(
249    hummock_manager_ref: &HummockManager,
250    table_ids: &[u32],
251) {
252    hummock_manager_ref
253        .unregister_table_ids(table_ids.iter().map(|table_id| TableId::new(*table_id)))
254        .await
255        .unwrap();
256}
257
258/// Generate keys like `001_key_test_00002` with timestamp `epoch`.
259pub fn iterator_test_key_of_epoch(
260    table: HummockSstableObjectId,
261    idx: usize,
262    ts: HummockEpoch,
263) -> Vec<u8> {
264    // key format: {prefix_index}_version
265    key_with_epoch(
266        format!("{:03}\0\0_key_test_{:05}", table, idx)
267            .as_bytes()
268            .to_vec(),
269        ts,
270    )
271}
272
273pub fn get_sorted_object_ids(sstables: &[SstableInfo]) -> Vec<HummockSstableObjectId> {
274    sstables
275        .iter()
276        .map(|table| table.object_id)
277        .sorted()
278        .collect_vec()
279}
280
281pub fn get_sorted_committed_object_ids(
282    hummock_version: &HummockVersion,
283    compaction_group_id: CompactionGroupId,
284) -> Vec<HummockSstableObjectId> {
285    let levels = match hummock_version.levels.get(&compaction_group_id) {
286        Some(levels) => levels,
287        None => return vec![],
288    };
289    levels
290        .levels
291        .iter()
292        .chain(levels.l0.sub_levels.iter())
293        .flat_map(|levels| levels.table_infos.iter().map(|info| info.object_id))
294        .sorted()
295        .collect_vec()
296}
297
298pub async fn setup_compute_env_with_config(
299    port: i32,
300    config: CompactionConfig,
301) -> (
302    MetaSrvEnv,
303    HummockManagerRef,
304    ClusterControllerRef,
305    WorkerId,
306) {
307    setup_compute_env_with_metric(port, config, None).await
308}
309
310pub async fn setup_compute_env_with_metric(
311    port: i32,
312    config: CompactionConfig,
313    meta_metric: Option<MetaMetrics>,
314) -> (
315    MetaSrvEnv,
316    HummockManagerRef,
317    ClusterControllerRef,
318    WorkerId,
319) {
320    let env = MetaSrvEnv::for_test().await;
321    let cluster_ctl = Arc::new(
322        ClusterController::new(env.clone(), Duration::from_secs(1))
323            .await
324            .unwrap(),
325    );
326    let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
327
328    let compactor_manager = Arc::new(CompactorManager::for_test());
329
330    let (compactor_streams_change_tx, _compactor_streams_change_rx) =
331        tokio::sync::mpsc::unbounded_channel();
332
333    let hummock_manager = HummockManager::with_config(
334        env.clone(),
335        cluster_ctl.clone(),
336        catalog_ctl,
337        Arc::new(meta_metric.unwrap_or_default()),
338        compactor_manager,
339        config,
340        compactor_streams_change_tx,
341    )
342    .await;
343
344    let fake_host_address = HostAddress {
345        host: "127.0.0.1".to_owned(),
346        port,
347    };
348    let fake_parallelism = 4;
349    let worker_id = cluster_ctl
350        .add_worker(
351            WorkerType::ComputeNode,
352            fake_host_address,
353            Property {
354                is_streaming: true,
355                is_serving: true,
356                is_unschedulable: false,
357                parallelism: fake_parallelism as _,
358                ..Default::default()
359            },
360            Default::default(),
361        )
362        .await
363        .unwrap();
364    (env, hummock_manager, cluster_ctl, worker_id)
365}
366
367pub async fn setup_compute_env(
368    port: i32,
369) -> (
370    MetaSrvEnv,
371    HummockManagerRef,
372    ClusterControllerRef,
373    WorkerId,
374) {
375    let config = CompactionConfigBuilder::new()
376        .level0_tier_compact_file_number(1)
377        .level0_max_compact_file_number(130)
378        .level0_sub_level_compact_level_count(1)
379        .level0_overlapping_sub_level_compact_level_count(1)
380        .build();
381    setup_compute_env_with_config(port, config).await
382}
383
384pub async fn get_sst_ids(
385    hummock_manager: &HummockManager,
386    number: u32,
387) -> Vec<HummockSstableObjectId> {
388    let range = hummock_manager.get_new_sst_ids(number).await.unwrap();
389    (range.start_id..range.end_id).collect_vec()
390}
391
392pub async fn add_ssts(
393    epoch: HummockEpoch,
394    hummock_manager: &HummockManager,
395    hummock_meta_client: Arc<dyn HummockMetaClient>,
396) -> Vec<SstableInfo> {
397    let table_ids = get_sst_ids(hummock_manager, 3).await;
398    let test_tables = generate_test_sstables_with_table_id(test_epoch(epoch), 1, table_ids);
399    let ssts = to_local_sstable_info(&test_tables);
400    hummock_meta_client
401        .commit_epoch(
402            epoch,
403            SyncResult {
404                uncommitted_ssts: ssts,
405                ..Default::default()
406            },
407        )
408        .await
409        .unwrap();
410    test_tables
411}
412
413pub fn compaction_selector_context<'a>(
414    group: &'a CompactionGroup,
415    levels: &'a Levels,
416    member_table_ids: &'a BTreeSet<TableId>,
417    level_handlers: &'a mut [LevelHandler],
418    selector_stats: &'a mut LocalSelectorStatistic,
419    table_id_to_options: &'a HashMap<u32, TableOption>,
420    developer_config: Arc<CompactionDeveloperConfig>,
421    table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
422    state_table_info: &'a HummockVersionStateTableInfo,
423) -> CompactionSelectorContext<'a> {
424    CompactionSelectorContext {
425        group,
426        levels,
427        member_table_ids,
428        level_handlers,
429        selector_stats,
430        table_id_to_options,
431        developer_config,
432        table_watermarks,
433        state_table_info,
434    }
435}
436
437pub async fn get_compaction_group_id_by_table_id(
438    hummock_manager_ref: HummockManagerRef,
439    table_id: u32,
440) -> u64 {
441    let version = hummock_manager_ref.get_current_version().await;
442    let mapping = version.state_table_info.build_table_compaction_group_id();
443    *mapping.get(&(table_id.into())).unwrap()
444}