1#![cfg(any(test, feature = "test"))]
16
17use std::collections::{BTreeSet, HashMap};
18use std::sync::Arc;
19use std::time::Duration;
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::util::epoch::test_epoch;
25use risingwave_hummock_sdk::key::key_with_epoch;
26use risingwave_hummock_sdk::key_range::KeyRange;
27use risingwave_hummock_sdk::level::Levels;
28use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
31use risingwave_hummock_sdk::{
32 CompactionGroupId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult,
33};
34use risingwave_meta_model::WorkerId;
35use risingwave_pb::common::worker_node::Property;
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::CompactionConfig;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use risingwave_rpc_client::HummockMetaClient;
40
41use crate::controller::catalog::CatalogController;
42use crate::controller::cluster::{ClusterController, ClusterControllerRef};
43use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
44use crate::hummock::compaction::selector::{LocalSelectorStatistic, default_compaction_selector};
45use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext};
46use crate::hummock::level_handler::LevelHandler;
47pub use crate::hummock::manager::CommitEpochInfo;
48use crate::hummock::model::CompactionGroup;
49use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
50use crate::manager::MetaSrvEnv;
51use crate::rpc::metrics::MetaMetrics;
52
53pub fn to_local_sstable_info(ssts: &[SstableInfo]) -> Vec<LocalSstableInfo> {
54 ssts.iter()
55 .map(|sst| LocalSstableInfo::for_test(sst.clone()))
56 .collect_vec()
57}
58
59pub async fn add_test_tables(
65 hummock_manager: &HummockManager,
66 hummock_meta_client: Arc<dyn HummockMetaClient>,
67 compaction_group_id: CompactionGroupId,
68) -> Vec<Vec<SstableInfo>> {
69 use risingwave_common::util::epoch::EpochExt;
72
73 let mut epoch = test_epoch(1);
74 let sstable_ids = get_sst_ids(hummock_manager, 3).await;
75 let test_tables = generate_test_sstables_with_table_id(epoch, 1, sstable_ids);
76 register_sstable_infos_to_compaction_group(hummock_manager, &test_tables, compaction_group_id)
77 .await;
78 let test_local_tables = to_local_sstable_info(&test_tables);
79 hummock_meta_client
80 .commit_epoch(
81 epoch,
82 SyncResult {
83 uncommitted_ssts: test_local_tables,
84 ..Default::default()
85 },
86 )
87 .await
88 .unwrap();
89
90 let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
92 register_sstable_infos_to_compaction_group(
93 hummock_manager,
94 &test_tables_2,
95 compaction_group_id,
96 )
97 .await;
98 let mut compact_task = hummock_manager
99 .get_compact_task(compaction_group_id, &mut default_compaction_selector())
100 .await
101 .unwrap()
102 .unwrap();
103 assert_eq!(
104 compact_task
105 .input_ssts
106 .iter()
107 .map(|i| i.table_infos.len())
108 .sum::<usize>(),
109 3
110 );
111
112 compact_task.target_level = 6;
113 hummock_manager
114 .report_compact_task_for_test(
115 compact_task.task_id,
116 Some(compact_task),
117 TaskStatus::Success,
118 test_tables_2.clone(),
119 None,
120 )
121 .await
122 .unwrap();
123 epoch.inc_epoch();
125 let test_tables_3 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
126 register_sstable_infos_to_compaction_group(
127 hummock_manager,
128 &test_tables_3,
129 compaction_group_id,
130 )
131 .await;
132 let test_local_tables_3 = to_local_sstable_info(&test_tables_3);
133 hummock_meta_client
134 .commit_epoch(
135 epoch,
136 SyncResult {
137 uncommitted_ssts: test_local_tables_3,
138 ..Default::default()
139 },
140 )
141 .await
142 .unwrap();
143 vec![test_tables, test_tables_2, test_tables_3]
144}
145
146pub fn generate_test_sstables_with_table_id(
147 epoch: u64,
148 table_id: u32,
149 sst_ids: Vec<u64>,
150) -> Vec<SstableInfo> {
151 let mut sst_info = vec![];
152 for (i, sst_id) in sst_ids.into_iter().enumerate() {
153 let object_size = 2;
154 sst_info.push(
155 SstableInfoInner {
156 object_id: sst_id.into(),
157 sst_id: sst_id.into(),
158 key_range: KeyRange {
159 left: Bytes::from(key_with_epoch(
160 format!("{:03}\0\0_key_test_{:05}", table_id, i + 1)
161 .as_bytes()
162 .to_vec(),
163 epoch,
164 )),
165 right: Bytes::from(key_with_epoch(
166 format!("{:03}\0\0_key_test_{:05}", table_id, (i + 1) * 10)
167 .as_bytes()
168 .to_vec(),
169 epoch,
170 )),
171 right_exclusive: false,
172 },
173 file_size: object_size,
174 table_ids: vec![table_id],
175 uncompressed_file_size: object_size,
176 max_epoch: epoch,
177 sst_size: object_size,
178 ..Default::default()
179 }
180 .into(),
181 );
182 }
183 sst_info
184}
185
186pub fn generate_test_tables(epoch: u64, sst_ids: Vec<u64>) -> Vec<SstableInfo> {
187 let mut sst_info = vec![];
188 for (i, sst_id) in sst_ids.into_iter().enumerate() {
189 let object_size = 2;
190 sst_info.push(
191 SstableInfoInner {
192 object_id: sst_id.into(),
193 sst_id: sst_id.into(),
194 key_range: KeyRange {
195 left: Bytes::from(iterator_test_key_of_epoch(sst_id, i + 1, epoch)),
196 right: Bytes::from(iterator_test_key_of_epoch(sst_id, (i + 1) * 10, epoch)),
197 right_exclusive: false,
198 },
199 file_size: object_size,
200 table_ids: vec![sst_id as u32, sst_id as u32 * 10000],
201 uncompressed_file_size: object_size,
202 max_epoch: epoch,
203 sst_size: object_size,
204 ..Default::default()
205 }
206 .into(),
207 );
208 }
209 sst_info
210}
211
212pub async fn register_sstable_infos_to_compaction_group(
213 compaction_group_manager_ref: &HummockManager,
214 sstable_infos: &[SstableInfo],
215 compaction_group_id: CompactionGroupId,
216) {
217 let table_ids = sstable_infos
218 .iter()
219 .flat_map(|sstable_info| &sstable_info.table_ids)
220 .sorted()
221 .dedup()
222 .cloned()
223 .collect_vec();
224 register_table_ids_to_compaction_group(
225 compaction_group_manager_ref,
226 &table_ids,
227 compaction_group_id,
228 )
229 .await;
230}
231
232pub async fn register_table_ids_to_compaction_group(
233 hummock_manager_ref: &HummockManager,
234 table_ids: &[u32],
235 compaction_group_id: CompactionGroupId,
236) {
237 hummock_manager_ref
238 .register_table_ids_for_test(
239 &table_ids
240 .iter()
241 .map(|table_id| (*table_id, compaction_group_id))
242 .collect_vec(),
243 )
244 .await
245 .unwrap();
246}
247
248pub async fn unregister_table_ids_from_compaction_group(
249 hummock_manager_ref: &HummockManager,
250 table_ids: &[u32],
251) {
252 hummock_manager_ref
253 .unregister_table_ids(table_ids.iter().map(|table_id| TableId::new(*table_id)))
254 .await
255 .unwrap();
256}
257
258pub fn iterator_test_key_of_epoch(table: u64, idx: usize, ts: HummockEpoch) -> Vec<u8> {
260 key_with_epoch(
262 format!("{:03}\0\0_key_test_{:05}", table, idx)
263 .as_bytes()
264 .to_vec(),
265 ts,
266 )
267}
268
269pub fn get_sorted_object_ids(sstables: &[SstableInfo]) -> Vec<HummockSstableObjectId> {
270 sstables
271 .iter()
272 .map(|table| table.object_id)
273 .sorted()
274 .collect_vec()
275}
276
277pub fn get_sorted_committed_object_ids(
278 hummock_version: &HummockVersion,
279 compaction_group_id: CompactionGroupId,
280) -> Vec<HummockSstableObjectId> {
281 let levels = match hummock_version.levels.get(&compaction_group_id) {
282 Some(levels) => levels,
283 None => return vec![],
284 };
285 levels
286 .levels
287 .iter()
288 .chain(levels.l0.sub_levels.iter())
289 .flat_map(|levels| levels.table_infos.iter().map(|info| info.object_id))
290 .sorted()
291 .collect_vec()
292}
293
294pub async fn setup_compute_env_with_config(
295 port: i32,
296 config: CompactionConfig,
297) -> (
298 MetaSrvEnv,
299 HummockManagerRef,
300 ClusterControllerRef,
301 WorkerId,
302) {
303 setup_compute_env_with_metric(port, config, None).await
304}
305
306pub async fn setup_compute_env_with_metric(
307 port: i32,
308 config: CompactionConfig,
309 meta_metric: Option<MetaMetrics>,
310) -> (
311 MetaSrvEnv,
312 HummockManagerRef,
313 ClusterControllerRef,
314 WorkerId,
315) {
316 let env = MetaSrvEnv::for_test().await;
317 let cluster_ctl = Arc::new(
318 ClusterController::new(env.clone(), Duration::from_secs(1))
319 .await
320 .unwrap(),
321 );
322 let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
323
324 let compactor_manager = Arc::new(CompactorManager::for_test());
325
326 let (compactor_streams_change_tx, _compactor_streams_change_rx) =
327 tokio::sync::mpsc::unbounded_channel();
328
329 let hummock_manager = HummockManager::with_config(
330 env.clone(),
331 cluster_ctl.clone(),
332 catalog_ctl,
333 Arc::new(meta_metric.unwrap_or_default()),
334 compactor_manager,
335 config,
336 compactor_streams_change_tx,
337 )
338 .await;
339
340 let fake_host_address = HostAddress {
341 host: "127.0.0.1".to_owned(),
342 port,
343 };
344 let fake_parallelism = 4;
345 let worker_id = cluster_ctl
346 .add_worker(
347 WorkerType::ComputeNode,
348 fake_host_address,
349 Property {
350 is_streaming: true,
351 is_serving: true,
352 is_unschedulable: false,
353 parallelism: fake_parallelism as _,
354 ..Default::default()
355 },
356 Default::default(),
357 )
358 .await
359 .unwrap();
360 (env, hummock_manager, cluster_ctl, worker_id)
361}
362
363pub async fn setup_compute_env(
364 port: i32,
365) -> (
366 MetaSrvEnv,
367 HummockManagerRef,
368 ClusterControllerRef,
369 WorkerId,
370) {
371 let config = CompactionConfigBuilder::new()
372 .level0_tier_compact_file_number(1)
373 .level0_max_compact_file_number(130)
374 .level0_sub_level_compact_level_count(1)
375 .level0_overlapping_sub_level_compact_level_count(1)
376 .build();
377 setup_compute_env_with_config(port, config).await
378}
379
380pub async fn get_sst_ids(hummock_manager: &HummockManager, number: u32) -> Vec<u64> {
381 let range = hummock_manager.get_new_object_ids(number).await.unwrap();
382 (range.start_id.inner()..range.end_id.inner()).collect_vec()
383}
384
385pub async fn add_ssts(
386 epoch: HummockEpoch,
387 hummock_manager: &HummockManager,
388 hummock_meta_client: Arc<dyn HummockMetaClient>,
389) -> Vec<SstableInfo> {
390 let table_ids = get_sst_ids(hummock_manager, 3).await;
391 let test_tables = generate_test_sstables_with_table_id(test_epoch(epoch), 1, table_ids);
392 let ssts = to_local_sstable_info(&test_tables);
393 hummock_meta_client
394 .commit_epoch(
395 epoch,
396 SyncResult {
397 uncommitted_ssts: ssts,
398 ..Default::default()
399 },
400 )
401 .await
402 .unwrap();
403 test_tables
404}
405
406pub fn compaction_selector_context<'a>(
407 group: &'a CompactionGroup,
408 levels: &'a Levels,
409 member_table_ids: &'a BTreeSet<TableId>,
410 level_handlers: &'a mut [LevelHandler],
411 selector_stats: &'a mut LocalSelectorStatistic,
412 table_id_to_options: &'a HashMap<u32, TableOption>,
413 developer_config: Arc<CompactionDeveloperConfig>,
414 table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
415 state_table_info: &'a HummockVersionStateTableInfo,
416) -> CompactionSelectorContext<'a> {
417 CompactionSelectorContext {
418 group,
419 levels,
420 member_table_ids,
421 level_handlers,
422 selector_stats,
423 table_id_to_options,
424 developer_config,
425 table_watermarks,
426 state_table_info,
427 }
428}
429
430pub async fn get_compaction_group_id_by_table_id(
431 hummock_manager_ref: HummockManagerRef,
432 table_id: u32,
433) -> u64 {
434 let version = hummock_manager_ref.get_current_version().await;
435 let mapping = version.state_table_info.build_table_compaction_group_id();
436 *mapping.get(&(table_id.into())).unwrap()
437}