Skip to main content

risingwave_meta/hummock/
test_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![cfg(any(test, feature = "test"))]
16
17use std::collections::{BTreeSet, HashMap};
18use std::sync::{Arc, LazyLock};
19use std::time::Duration;
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::util::epoch::test_epoch;
25use risingwave_hummock_sdk::key::key_with_epoch;
26use risingwave_hummock_sdk::key_range::KeyRange;
27use risingwave_hummock_sdk::level::Levels;
28use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
31use risingwave_hummock_sdk::{
32    CompactionGroupId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult,
33};
34use risingwave_meta_model::WorkerId;
35use risingwave_pb::common::worker_node::Property;
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::CompactionConfig;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use risingwave_rpc_client::HummockMetaClient;
40
41use crate::controller::catalog::CatalogController;
42use crate::controller::cluster::{ClusterController, ClusterControllerRef};
43use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
44use crate::hummock::compaction::in_progress_compaction::InProgressCompactionView;
45use crate::hummock::compaction::selector::{LocalSelectorStatistic, default_compaction_selector};
46use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext};
47use crate::hummock::level_handler::LevelHandler;
48pub use crate::hummock::manager::CommitEpochInfo;
49use crate::hummock::model::CompactionGroup;
50use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
51use crate::manager::MetaSrvEnv;
52use crate::rpc::metrics::MetaMetrics;
53
54static EMPTY_IN_PROGRESS_COMPACTION_VIEW: LazyLock<InProgressCompactionView> =
55    LazyLock::new(InProgressCompactionView::default);
56
57pub fn to_local_sstable_info(ssts: &[SstableInfo]) -> Vec<LocalSstableInfo> {
58    ssts.iter()
59        .map(|sst| LocalSstableInfo::for_test(sst.clone()))
60        .collect_vec()
61}
62
63// This function has 3 phases:
64// 1. add 3 ssts to
65// 2. trigger a compaction and replace the input from phase 1 with the 1 new sst
66// 3. add 1 new sst
67// Please make sure the function do what you want before using it.
68pub async fn add_test_tables(
69    hummock_manager: &HummockManager,
70    hummock_meta_client: Arc<dyn HummockMetaClient>,
71    compaction_group_id: CompactionGroupId,
72) -> Vec<Vec<SstableInfo>> {
73    // Increase version by 2.
74
75    use risingwave_common::util::epoch::EpochExt;
76
77    let mut epoch = test_epoch(1);
78    let sstable_ids = get_sst_ids(hummock_manager, 3).await;
79    let test_tables = generate_test_sstables_with_table_id(epoch, 1, sstable_ids);
80    register_sstable_infos_to_compaction_group(hummock_manager, &test_tables, compaction_group_id)
81        .await;
82    let test_local_tables = to_local_sstable_info(&test_tables);
83    hummock_meta_client
84        .commit_epoch(
85            epoch,
86            SyncResult {
87                uncommitted_ssts: test_local_tables,
88                ..Default::default()
89            },
90        )
91        .await
92        .unwrap();
93
94    // Simulate a compaction and increase version by 1.
95    let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
96    register_sstable_infos_to_compaction_group(
97        hummock_manager,
98        &test_tables_2,
99        compaction_group_id,
100    )
101    .await;
102    let mut compact_task = hummock_manager
103        .get_compact_task(compaction_group_id, &mut *default_compaction_selector())
104        .await
105        .unwrap()
106        .unwrap();
107    assert_eq!(
108        compact_task
109            .input_ssts
110            .iter()
111            .map(|i| i.table_infos.len())
112            .sum::<usize>(),
113        3
114    );
115
116    compact_task.target_level = 6;
117    hummock_manager
118        .report_compact_task_for_test(
119            compact_task.task_id,
120            Some(compact_task),
121            TaskStatus::Success,
122            test_tables_2.clone(),
123            None,
124        )
125        .await
126        .unwrap();
127    // Increase version by 1.
128    epoch.inc_epoch();
129    let test_tables_3 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
130    register_sstable_infos_to_compaction_group(
131        hummock_manager,
132        &test_tables_3,
133        compaction_group_id,
134    )
135    .await;
136    let test_local_tables_3 = to_local_sstable_info(&test_tables_3);
137    hummock_meta_client
138        .commit_epoch(
139            epoch,
140            SyncResult {
141                uncommitted_ssts: test_local_tables_3,
142                ..Default::default()
143            },
144        )
145        .await
146        .unwrap();
147    vec![test_tables, test_tables_2, test_tables_3]
148}
149
150pub fn generate_test_sstables_with_table_id(
151    epoch: u64,
152    table_id: impl Into<TableId>,
153    sst_ids: Vec<u64>,
154) -> Vec<SstableInfo> {
155    let table_id = table_id.into();
156    let mut sst_info = vec![];
157    for (i, sst_id) in sst_ids.into_iter().enumerate() {
158        let object_size = 2;
159        sst_info.push(
160            SstableInfoInner {
161                object_id: sst_id.into(),
162                sst_id: sst_id.into(),
163                key_range: KeyRange {
164                    left: Bytes::from(key_with_epoch(
165                        format!("{:03}\0\0_key_test_{:05}", table_id, i + 1)
166                            .as_bytes()
167                            .to_vec(),
168                        epoch,
169                    )),
170                    right: Bytes::from(key_with_epoch(
171                        format!("{:03}\0\0_key_test_{:05}", table_id, (i + 1) * 10)
172                            .as_bytes()
173                            .to_vec(),
174                        epoch,
175                    )),
176                    right_exclusive: false,
177                },
178                file_size: object_size,
179                table_ids: vec![table_id],
180                uncompressed_file_size: object_size,
181                max_epoch: epoch,
182                sst_size: object_size,
183                ..Default::default()
184            }
185            .into(),
186        );
187    }
188    sst_info
189}
190
191pub fn generate_test_tables(epoch: u64, sst_ids: Vec<u64>) -> Vec<SstableInfo> {
192    let mut sst_info = vec![];
193    for (i, sst_id) in sst_ids.into_iter().enumerate() {
194        let object_size = 2;
195        sst_info.push(
196            SstableInfoInner {
197                object_id: sst_id.into(),
198                sst_id: sst_id.into(),
199                key_range: KeyRange {
200                    left: Bytes::from(iterator_test_key_of_epoch(sst_id, i + 1, epoch)),
201                    right: Bytes::from(iterator_test_key_of_epoch(sst_id, (i + 1) * 10, epoch)),
202                    right_exclusive: false,
203                },
204                file_size: object_size,
205                table_ids: vec![(sst_id as u32).into(), (sst_id as u32 * 10000).into()],
206                uncompressed_file_size: object_size,
207                max_epoch: epoch,
208                sst_size: object_size,
209                ..Default::default()
210            }
211            .into(),
212        );
213    }
214    sst_info
215}
216
217pub async fn register_sstable_infos_to_compaction_group(
218    compaction_group_manager_ref: &HummockManager,
219    sstable_infos: &[SstableInfo],
220    compaction_group_id: CompactionGroupId,
221) {
222    let table_ids = sstable_infos
223        .iter()
224        .flat_map(|sstable_info| &sstable_info.table_ids)
225        .sorted()
226        .dedup()
227        .copied()
228        .collect_vec();
229    register_table_ids_to_compaction_group(
230        compaction_group_manager_ref,
231        &table_ids,
232        compaction_group_id,
233    )
234    .await;
235}
236
237pub async fn register_table_ids_to_compaction_group(
238    hummock_manager_ref: &HummockManager,
239    table_ids: &[impl Into<TableId> + Copy],
240    compaction_group_id: CompactionGroupId,
241) {
242    hummock_manager_ref
243        .register_table_ids_for_test(
244            &table_ids
245                .iter()
246                .map(|table_id| (*table_id, compaction_group_id))
247                .collect_vec(),
248        )
249        .await
250        .unwrap();
251}
252
253pub async fn unregister_table_ids_from_compaction_group(
254    hummock_manager_ref: &HummockManager,
255    table_ids: &[impl Into<TableId> + Copy],
256) {
257    hummock_manager_ref
258        .unregister_table_ids(table_ids.iter().map(|table_id| (*table_id).into()))
259        .await
260        .unwrap();
261}
262
263/// Generate keys like `001_key_test_00002` with timestamp `epoch`.
264pub fn iterator_test_key_of_epoch(table: u64, idx: usize, ts: HummockEpoch) -> Vec<u8> {
265    // key format: {prefix_index}_version
266    key_with_epoch(
267        format!("{:03}\0\0_key_test_{:05}", table, idx)
268            .as_bytes()
269            .to_vec(),
270        ts,
271    )
272}
273
274pub fn get_sorted_object_ids(sstables: &[SstableInfo]) -> Vec<HummockSstableObjectId> {
275    sstables
276        .iter()
277        .map(|table| table.object_id)
278        .sorted()
279        .collect_vec()
280}
281
282pub fn get_sorted_committed_object_ids(
283    hummock_version: &HummockVersion,
284    compaction_group_id: CompactionGroupId,
285) -> Vec<HummockSstableObjectId> {
286    let levels = match hummock_version.levels.get(&compaction_group_id) {
287        Some(levels) => levels,
288        None => return vec![],
289    };
290    levels
291        .levels
292        .iter()
293        .chain(levels.l0.sub_levels.iter())
294        .flat_map(|levels| levels.table_infos.iter().map(|info| info.object_id))
295        .sorted()
296        .collect_vec()
297}
298
299pub async fn setup_compute_env_with_config(
300    port: i32,
301    config: CompactionConfig,
302) -> (
303    MetaSrvEnv,
304    HummockManagerRef,
305    ClusterControllerRef,
306    WorkerId,
307) {
308    setup_compute_env_with_metric(port, config, None).await
309}
310
311pub async fn setup_compute_env_with_metric(
312    port: i32,
313    config: CompactionConfig,
314    meta_metric: Option<MetaMetrics>,
315) -> (
316    MetaSrvEnv,
317    HummockManagerRef,
318    ClusterControllerRef,
319    WorkerId,
320) {
321    let env = MetaSrvEnv::for_test().await;
322    let cluster_ctl = Arc::new(
323        ClusterController::new(env.clone(), Duration::from_secs(1))
324            .await
325            .unwrap(),
326    );
327    let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
328
329    let compactor_manager = Arc::new(CompactorManager::for_test());
330
331    let (compactor_streams_change_tx, _compactor_streams_change_rx) =
332        tokio::sync::mpsc::unbounded_channel();
333
334    let hummock_manager = HummockManager::with_config(
335        env.clone(),
336        cluster_ctl.clone(),
337        catalog_ctl,
338        Arc::new(meta_metric.unwrap_or_default()),
339        compactor_manager,
340        config,
341        compactor_streams_change_tx,
342    )
343    .await;
344
345    let fake_host_address = HostAddress {
346        host: "127.0.0.1".to_owned(),
347        port,
348    };
349    let fake_parallelism = 4;
350    let worker_id = cluster_ctl
351        .add_worker(
352            WorkerType::ComputeNode,
353            fake_host_address,
354            Property {
355                is_streaming: true,
356                is_serving: true,
357                parallelism: fake_parallelism as _,
358                ..Default::default()
359            },
360            Default::default(),
361        )
362        .await
363        .unwrap();
364    (env, hummock_manager, cluster_ctl, worker_id)
365}
366
367pub async fn setup_compute_env(
368    port: i32,
369) -> (
370    MetaSrvEnv,
371    HummockManagerRef,
372    ClusterControllerRef,
373    WorkerId,
374) {
375    let config = CompactionConfigBuilder::new()
376        .level0_tier_compact_file_number(1)
377        .level0_max_compact_file_number(130)
378        .level0_sub_level_compact_level_count(1)
379        .level0_overlapping_sub_level_compact_level_count(1)
380        .build();
381    setup_compute_env_with_config(port, config).await
382}
383
384pub async fn get_sst_ids(hummock_manager: &HummockManager, number: u32) -> Vec<u64> {
385    let range = hummock_manager.get_new_object_ids(number).await.unwrap();
386    (range.start_id..range.end_id)
387        .map(|id| id.as_raw_id())
388        .collect_vec()
389}
390
391pub async fn add_ssts(
392    epoch: HummockEpoch,
393    hummock_manager: &HummockManager,
394    hummock_meta_client: Arc<dyn HummockMetaClient>,
395) -> Vec<SstableInfo> {
396    let table_ids = get_sst_ids(hummock_manager, 3).await;
397    let test_tables = generate_test_sstables_with_table_id(test_epoch(epoch), 1, table_ids);
398    let ssts = to_local_sstable_info(&test_tables);
399    hummock_meta_client
400        .commit_epoch(
401            epoch,
402            SyncResult {
403                uncommitted_ssts: ssts,
404                ..Default::default()
405            },
406        )
407        .await
408        .unwrap();
409    test_tables
410}
411
412pub fn compaction_selector_context<'a>(
413    group: &'a CompactionGroup,
414    levels: &'a Levels,
415    member_table_ids: &'a BTreeSet<TableId>,
416    level_handlers: &'a mut [LevelHandler],
417    selector_stats: &'a mut LocalSelectorStatistic,
418    table_id_to_options: &'a HashMap<TableId, TableOption>,
419    developer_config: Arc<CompactionDeveloperConfig>,
420    table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
421    state_table_info: &'a HummockVersionStateTableInfo,
422) -> CompactionSelectorContext<'a> {
423    CompactionSelectorContext {
424        group,
425        levels,
426        member_table_ids,
427        level_handlers,
428        selector_stats,
429        table_id_to_options,
430        developer_config,
431        table_watermarks,
432        state_table_info,
433        in_progress_compactions: &EMPTY_IN_PROGRESS_COMPACTION_VIEW,
434    }
435}
436
437pub async fn get_compaction_group_id_by_table_id(
438    hummock_manager_ref: HummockManagerRef,
439    table_id: impl Into<TableId>,
440) -> CompactionGroupId {
441    let version = hummock_manager_ref.get_current_version().await;
442    let mapping = version.state_table_info.build_table_compaction_group_id();
443    *mapping.get(&table_id.into()).unwrap()
444}