1#![cfg(any(test, feature = "test"))]
16
17use std::collections::{BTreeSet, HashMap};
18use std::sync::{Arc, LazyLock};
19use std::time::Duration;
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::util::epoch::test_epoch;
25use risingwave_hummock_sdk::key::key_with_epoch;
26use risingwave_hummock_sdk::key_range::KeyRange;
27use risingwave_hummock_sdk::level::Levels;
28use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
31use risingwave_hummock_sdk::{
32 CompactionGroupId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult,
33};
34use risingwave_meta_model::WorkerId;
35use risingwave_pb::common::worker_node::Property;
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::CompactionConfig;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use risingwave_rpc_client::HummockMetaClient;
40
41use crate::controller::catalog::CatalogController;
42use crate::controller::cluster::{ClusterController, ClusterControllerRef};
43use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
44use crate::hummock::compaction::in_progress_compaction::InProgressCompactionView;
45use crate::hummock::compaction::selector::{LocalSelectorStatistic, default_compaction_selector};
46use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext};
47use crate::hummock::level_handler::LevelHandler;
48pub use crate::hummock::manager::CommitEpochInfo;
49use crate::hummock::model::CompactionGroup;
50use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
51use crate::manager::MetaSrvEnv;
52use crate::rpc::metrics::MetaMetrics;
53
54static EMPTY_IN_PROGRESS_COMPACTION_VIEW: LazyLock<InProgressCompactionView> =
55 LazyLock::new(InProgressCompactionView::default);
56
57pub fn to_local_sstable_info(ssts: &[SstableInfo]) -> Vec<LocalSstableInfo> {
58 ssts.iter()
59 .map(|sst| LocalSstableInfo::for_test(sst.clone()))
60 .collect_vec()
61}
62
63pub async fn add_test_tables(
69 hummock_manager: &HummockManager,
70 hummock_meta_client: Arc<dyn HummockMetaClient>,
71 compaction_group_id: CompactionGroupId,
72) -> Vec<Vec<SstableInfo>> {
73 use risingwave_common::util::epoch::EpochExt;
76
77 let mut epoch = test_epoch(1);
78 let sstable_ids = get_sst_ids(hummock_manager, 3).await;
79 let test_tables = generate_test_sstables_with_table_id(epoch, 1, sstable_ids);
80 register_sstable_infos_to_compaction_group(hummock_manager, &test_tables, compaction_group_id)
81 .await;
82 let test_local_tables = to_local_sstable_info(&test_tables);
83 hummock_meta_client
84 .commit_epoch(
85 epoch,
86 SyncResult {
87 uncommitted_ssts: test_local_tables,
88 ..Default::default()
89 },
90 )
91 .await
92 .unwrap();
93
94 let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
96 register_sstable_infos_to_compaction_group(
97 hummock_manager,
98 &test_tables_2,
99 compaction_group_id,
100 )
101 .await;
102 let mut compact_task = hummock_manager
103 .get_compact_task(compaction_group_id, &mut *default_compaction_selector())
104 .await
105 .unwrap()
106 .unwrap();
107 assert_eq!(
108 compact_task
109 .input_ssts
110 .iter()
111 .map(|i| i.table_infos.len())
112 .sum::<usize>(),
113 3
114 );
115
116 compact_task.target_level = 6;
117 hummock_manager
118 .report_compact_task_for_test(
119 compact_task.task_id,
120 Some(compact_task),
121 TaskStatus::Success,
122 test_tables_2.clone(),
123 None,
124 )
125 .await
126 .unwrap();
127 epoch.inc_epoch();
129 let test_tables_3 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
130 register_sstable_infos_to_compaction_group(
131 hummock_manager,
132 &test_tables_3,
133 compaction_group_id,
134 )
135 .await;
136 let test_local_tables_3 = to_local_sstable_info(&test_tables_3);
137 hummock_meta_client
138 .commit_epoch(
139 epoch,
140 SyncResult {
141 uncommitted_ssts: test_local_tables_3,
142 ..Default::default()
143 },
144 )
145 .await
146 .unwrap();
147 vec![test_tables, test_tables_2, test_tables_3]
148}
149
150pub fn generate_test_sstables_with_table_id(
151 epoch: u64,
152 table_id: impl Into<TableId>,
153 sst_ids: Vec<u64>,
154) -> Vec<SstableInfo> {
155 let table_id = table_id.into();
156 let mut sst_info = vec![];
157 for (i, sst_id) in sst_ids.into_iter().enumerate() {
158 let object_size = 2;
159 sst_info.push(
160 SstableInfoInner {
161 object_id: sst_id.into(),
162 sst_id: sst_id.into(),
163 key_range: KeyRange {
164 left: Bytes::from(key_with_epoch(
165 format!("{:03}\0\0_key_test_{:05}", table_id, i + 1)
166 .as_bytes()
167 .to_vec(),
168 epoch,
169 )),
170 right: Bytes::from(key_with_epoch(
171 format!("{:03}\0\0_key_test_{:05}", table_id, (i + 1) * 10)
172 .as_bytes()
173 .to_vec(),
174 epoch,
175 )),
176 right_exclusive: false,
177 },
178 file_size: object_size,
179 table_ids: vec![table_id],
180 uncompressed_file_size: object_size,
181 max_epoch: epoch,
182 sst_size: object_size,
183 ..Default::default()
184 }
185 .into(),
186 );
187 }
188 sst_info
189}
190
191pub fn generate_test_tables(epoch: u64, sst_ids: Vec<u64>) -> Vec<SstableInfo> {
192 let mut sst_info = vec![];
193 for (i, sst_id) in sst_ids.into_iter().enumerate() {
194 let object_size = 2;
195 sst_info.push(
196 SstableInfoInner {
197 object_id: sst_id.into(),
198 sst_id: sst_id.into(),
199 key_range: KeyRange {
200 left: Bytes::from(iterator_test_key_of_epoch(sst_id, i + 1, epoch)),
201 right: Bytes::from(iterator_test_key_of_epoch(sst_id, (i + 1) * 10, epoch)),
202 right_exclusive: false,
203 },
204 file_size: object_size,
205 table_ids: vec![(sst_id as u32).into(), (sst_id as u32 * 10000).into()],
206 uncompressed_file_size: object_size,
207 max_epoch: epoch,
208 sst_size: object_size,
209 ..Default::default()
210 }
211 .into(),
212 );
213 }
214 sst_info
215}
216
217pub async fn register_sstable_infos_to_compaction_group(
218 compaction_group_manager_ref: &HummockManager,
219 sstable_infos: &[SstableInfo],
220 compaction_group_id: CompactionGroupId,
221) {
222 let table_ids = sstable_infos
223 .iter()
224 .flat_map(|sstable_info| &sstable_info.table_ids)
225 .sorted()
226 .dedup()
227 .copied()
228 .collect_vec();
229 register_table_ids_to_compaction_group(
230 compaction_group_manager_ref,
231 &table_ids,
232 compaction_group_id,
233 )
234 .await;
235}
236
237pub async fn register_table_ids_to_compaction_group(
238 hummock_manager_ref: &HummockManager,
239 table_ids: &[impl Into<TableId> + Copy],
240 compaction_group_id: CompactionGroupId,
241) {
242 hummock_manager_ref
243 .register_table_ids_for_test(
244 &table_ids
245 .iter()
246 .map(|table_id| (*table_id, compaction_group_id))
247 .collect_vec(),
248 )
249 .await
250 .unwrap();
251}
252
253pub async fn unregister_table_ids_from_compaction_group(
254 hummock_manager_ref: &HummockManager,
255 table_ids: &[impl Into<TableId> + Copy],
256) {
257 hummock_manager_ref
258 .unregister_table_ids(table_ids.iter().map(|table_id| (*table_id).into()))
259 .await
260 .unwrap();
261}
262
263pub fn iterator_test_key_of_epoch(table: u64, idx: usize, ts: HummockEpoch) -> Vec<u8> {
265 key_with_epoch(
267 format!("{:03}\0\0_key_test_{:05}", table, idx)
268 .as_bytes()
269 .to_vec(),
270 ts,
271 )
272}
273
274pub fn get_sorted_object_ids(sstables: &[SstableInfo]) -> Vec<HummockSstableObjectId> {
275 sstables
276 .iter()
277 .map(|table| table.object_id)
278 .sorted()
279 .collect_vec()
280}
281
282pub fn get_sorted_committed_object_ids(
283 hummock_version: &HummockVersion,
284 compaction_group_id: CompactionGroupId,
285) -> Vec<HummockSstableObjectId> {
286 let levels = match hummock_version.levels.get(&compaction_group_id) {
287 Some(levels) => levels,
288 None => return vec![],
289 };
290 levels
291 .levels
292 .iter()
293 .chain(levels.l0.sub_levels.iter())
294 .flat_map(|levels| levels.table_infos.iter().map(|info| info.object_id))
295 .sorted()
296 .collect_vec()
297}
298
299pub async fn setup_compute_env_with_config(
300 port: i32,
301 config: CompactionConfig,
302) -> (
303 MetaSrvEnv,
304 HummockManagerRef,
305 ClusterControllerRef,
306 WorkerId,
307) {
308 setup_compute_env_with_metric(port, config, None).await
309}
310
311pub async fn setup_compute_env_with_metric(
312 port: i32,
313 config: CompactionConfig,
314 meta_metric: Option<MetaMetrics>,
315) -> (
316 MetaSrvEnv,
317 HummockManagerRef,
318 ClusterControllerRef,
319 WorkerId,
320) {
321 let env = MetaSrvEnv::for_test().await;
322 let cluster_ctl = Arc::new(
323 ClusterController::new(env.clone(), Duration::from_secs(1))
324 .await
325 .unwrap(),
326 );
327 let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
328
329 let compactor_manager = Arc::new(CompactorManager::for_test());
330
331 let (compactor_streams_change_tx, _compactor_streams_change_rx) =
332 tokio::sync::mpsc::unbounded_channel();
333
334 let hummock_manager = HummockManager::with_config(
335 env.clone(),
336 cluster_ctl.clone(),
337 catalog_ctl,
338 Arc::new(meta_metric.unwrap_or_default()),
339 compactor_manager,
340 config,
341 compactor_streams_change_tx,
342 )
343 .await;
344
345 let fake_host_address = HostAddress {
346 host: "127.0.0.1".to_owned(),
347 port,
348 };
349 let fake_parallelism = 4;
350 let worker_id = cluster_ctl
351 .add_worker(
352 WorkerType::ComputeNode,
353 fake_host_address,
354 Property {
355 is_streaming: true,
356 is_serving: true,
357 parallelism: fake_parallelism as _,
358 ..Default::default()
359 },
360 Default::default(),
361 )
362 .await
363 .unwrap();
364 (env, hummock_manager, cluster_ctl, worker_id)
365}
366
367pub async fn setup_compute_env(
368 port: i32,
369) -> (
370 MetaSrvEnv,
371 HummockManagerRef,
372 ClusterControllerRef,
373 WorkerId,
374) {
375 let config = CompactionConfigBuilder::new()
376 .level0_tier_compact_file_number(1)
377 .level0_max_compact_file_number(130)
378 .level0_sub_level_compact_level_count(1)
379 .level0_overlapping_sub_level_compact_level_count(1)
380 .build();
381 setup_compute_env_with_config(port, config).await
382}
383
384pub async fn get_sst_ids(hummock_manager: &HummockManager, number: u32) -> Vec<u64> {
385 let range = hummock_manager.get_new_object_ids(number).await.unwrap();
386 (range.start_id..range.end_id)
387 .map(|id| id.as_raw_id())
388 .collect_vec()
389}
390
391pub async fn add_ssts(
392 epoch: HummockEpoch,
393 hummock_manager: &HummockManager,
394 hummock_meta_client: Arc<dyn HummockMetaClient>,
395) -> Vec<SstableInfo> {
396 let table_ids = get_sst_ids(hummock_manager, 3).await;
397 let test_tables = generate_test_sstables_with_table_id(test_epoch(epoch), 1, table_ids);
398 let ssts = to_local_sstable_info(&test_tables);
399 hummock_meta_client
400 .commit_epoch(
401 epoch,
402 SyncResult {
403 uncommitted_ssts: ssts,
404 ..Default::default()
405 },
406 )
407 .await
408 .unwrap();
409 test_tables
410}
411
412pub fn compaction_selector_context<'a>(
413 group: &'a CompactionGroup,
414 levels: &'a Levels,
415 member_table_ids: &'a BTreeSet<TableId>,
416 level_handlers: &'a mut [LevelHandler],
417 selector_stats: &'a mut LocalSelectorStatistic,
418 table_id_to_options: &'a HashMap<TableId, TableOption>,
419 developer_config: Arc<CompactionDeveloperConfig>,
420 table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
421 state_table_info: &'a HummockVersionStateTableInfo,
422) -> CompactionSelectorContext<'a> {
423 CompactionSelectorContext {
424 group,
425 levels,
426 member_table_ids,
427 level_handlers,
428 selector_stats,
429 table_id_to_options,
430 developer_config,
431 table_watermarks,
432 state_table_info,
433 in_progress_compactions: &EMPTY_IN_PROGRESS_COMPACTION_VIEW,
434 }
435}
436
437pub async fn get_compaction_group_id_by_table_id(
438 hummock_manager_ref: HummockManagerRef,
439 table_id: impl Into<TableId>,
440) -> CompactionGroupId {
441 let version = hummock_manager_ref.get_current_version().await;
442 let mapping = version.state_table_info.build_table_compaction_group_id();
443 *mapping.get(&table_id.into()).unwrap()
444}