1#![cfg(any(test, feature = "test"))]
16
17use std::collections::{BTreeSet, HashMap};
18use std::sync::Arc;
19use std::time::Duration;
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::util::epoch::test_epoch;
25use risingwave_hummock_sdk::key::key_with_epoch;
26use risingwave_hummock_sdk::key_range::KeyRange;
27use risingwave_hummock_sdk::level::Levels;
28use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
31use risingwave_hummock_sdk::{
32 CompactionGroupId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult,
33};
34use risingwave_meta_model::WorkerId;
35use risingwave_pb::common::worker_node::Property;
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::CompactionConfig;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use risingwave_rpc_client::HummockMetaClient;
40
41use crate::controller::catalog::CatalogController;
42use crate::controller::cluster::{ClusterController, ClusterControllerRef};
43use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
44use crate::hummock::compaction::selector::{LocalSelectorStatistic, default_compaction_selector};
45use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext};
46use crate::hummock::level_handler::LevelHandler;
47pub use crate::hummock::manager::CommitEpochInfo;
48use crate::hummock::model::CompactionGroup;
49use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
50use crate::manager::MetaSrvEnv;
51use crate::rpc::metrics::MetaMetrics;
52
53pub fn to_local_sstable_info(ssts: &[SstableInfo]) -> Vec<LocalSstableInfo> {
54 ssts.iter()
55 .map(|sst| LocalSstableInfo::for_test(sst.clone()))
56 .collect_vec()
57}
58
59pub async fn add_test_tables(
65 hummock_manager: &HummockManager,
66 hummock_meta_client: Arc<dyn HummockMetaClient>,
67 compaction_group_id: CompactionGroupId,
68) -> Vec<Vec<SstableInfo>> {
69 use risingwave_common::util::epoch::EpochExt;
72
73 let mut epoch = test_epoch(1);
74 let sstable_ids = get_sst_ids(hummock_manager, 3).await;
75 let test_tables = generate_test_sstables_with_table_id(epoch, 1, sstable_ids);
76 register_sstable_infos_to_compaction_group(hummock_manager, &test_tables, compaction_group_id)
77 .await;
78 let test_local_tables = to_local_sstable_info(&test_tables);
79 hummock_meta_client
80 .commit_epoch(
81 epoch,
82 SyncResult {
83 uncommitted_ssts: test_local_tables,
84 ..Default::default()
85 },
86 )
87 .await
88 .unwrap();
89
90 let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
92 register_sstable_infos_to_compaction_group(
93 hummock_manager,
94 &test_tables_2,
95 compaction_group_id,
96 )
97 .await;
98 let mut compact_task = hummock_manager
99 .get_compact_task(compaction_group_id, &mut default_compaction_selector())
100 .await
101 .unwrap()
102 .unwrap();
103 assert_eq!(
104 compact_task
105 .input_ssts
106 .iter()
107 .map(|i| i.table_infos.len())
108 .sum::<usize>(),
109 3
110 );
111
112 compact_task.target_level = 6;
113 hummock_manager
114 .report_compact_task_for_test(
115 compact_task.task_id,
116 Some(compact_task),
117 TaskStatus::Success,
118 test_tables_2.clone(),
119 None,
120 )
121 .await
122 .unwrap();
123 epoch.inc_epoch();
125 let test_tables_3 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
126 register_sstable_infos_to_compaction_group(
127 hummock_manager,
128 &test_tables_3,
129 compaction_group_id,
130 )
131 .await;
132 let test_local_tables_3 = to_local_sstable_info(&test_tables_3);
133 hummock_meta_client
134 .commit_epoch(
135 epoch,
136 SyncResult {
137 uncommitted_ssts: test_local_tables_3,
138 ..Default::default()
139 },
140 )
141 .await
142 .unwrap();
143 vec![test_tables, test_tables_2, test_tables_3]
144}
145
146pub fn generate_test_sstables_with_table_id(
147 epoch: u64,
148 table_id: u32,
149 sst_ids: Vec<HummockSstableObjectId>,
150) -> Vec<SstableInfo> {
151 let mut sst_info = vec![];
152 for (i, sst_id) in sst_ids.into_iter().enumerate() {
153 let object_size = 2;
154 sst_info.push(
155 SstableInfoInner {
156 object_id: sst_id,
157 sst_id,
158 key_range: KeyRange {
159 left: Bytes::from(key_with_epoch(
160 format!("{:03}\0\0_key_test_{:05}", table_id, i + 1)
161 .as_bytes()
162 .to_vec(),
163 epoch,
164 )),
165 right: Bytes::from(key_with_epoch(
166 format!("{:03}\0\0_key_test_{:05}", table_id, (i + 1) * 10)
167 .as_bytes()
168 .to_vec(),
169 epoch,
170 )),
171 right_exclusive: false,
172 },
173 file_size: object_size,
174 table_ids: vec![table_id],
175 uncompressed_file_size: object_size,
176 max_epoch: epoch,
177 sst_size: object_size,
178 ..Default::default()
179 }
180 .into(),
181 );
182 }
183 sst_info
184}
185
186pub fn generate_test_tables(epoch: u64, sst_ids: Vec<HummockSstableObjectId>) -> Vec<SstableInfo> {
187 let mut sst_info = vec![];
188 for (i, sst_id) in sst_ids.into_iter().enumerate() {
189 let object_size = 2;
190 sst_info.push(
191 SstableInfoInner {
192 object_id: sst_id,
193 sst_id,
194 key_range: KeyRange {
195 left: Bytes::from(iterator_test_key_of_epoch(sst_id, i + 1, epoch)),
196 right: Bytes::from(iterator_test_key_of_epoch(sst_id, (i + 1) * 10, epoch)),
197 right_exclusive: false,
198 },
199 file_size: object_size,
200 table_ids: vec![sst_id as u32, sst_id as u32 * 10000],
201 uncompressed_file_size: object_size,
202 max_epoch: epoch,
203 sst_size: object_size,
204 ..Default::default()
205 }
206 .into(),
207 );
208 }
209 sst_info
210}
211
212pub async fn register_sstable_infos_to_compaction_group(
213 compaction_group_manager_ref: &HummockManager,
214 sstable_infos: &[SstableInfo],
215 compaction_group_id: CompactionGroupId,
216) {
217 let table_ids = sstable_infos
218 .iter()
219 .flat_map(|sstable_info| &sstable_info.table_ids)
220 .sorted()
221 .dedup()
222 .cloned()
223 .collect_vec();
224 register_table_ids_to_compaction_group(
225 compaction_group_manager_ref,
226 &table_ids,
227 compaction_group_id,
228 )
229 .await;
230}
231
232pub async fn register_table_ids_to_compaction_group(
233 hummock_manager_ref: &HummockManager,
234 table_ids: &[u32],
235 compaction_group_id: CompactionGroupId,
236) {
237 hummock_manager_ref
238 .register_table_ids_for_test(
239 &table_ids
240 .iter()
241 .map(|table_id| (*table_id, compaction_group_id))
242 .collect_vec(),
243 )
244 .await
245 .unwrap();
246}
247
248pub async fn unregister_table_ids_from_compaction_group(
249 hummock_manager_ref: &HummockManager,
250 table_ids: &[u32],
251) {
252 hummock_manager_ref
253 .unregister_table_ids(table_ids.iter().map(|table_id| TableId::new(*table_id)))
254 .await
255 .unwrap();
256}
257
258pub fn iterator_test_key_of_epoch(
260 table: HummockSstableObjectId,
261 idx: usize,
262 ts: HummockEpoch,
263) -> Vec<u8> {
264 key_with_epoch(
266 format!("{:03}\0\0_key_test_{:05}", table, idx)
267 .as_bytes()
268 .to_vec(),
269 ts,
270 )
271}
272
273pub fn get_sorted_object_ids(sstables: &[SstableInfo]) -> Vec<HummockSstableObjectId> {
274 sstables
275 .iter()
276 .map(|table| table.object_id)
277 .sorted()
278 .collect_vec()
279}
280
281pub fn get_sorted_committed_object_ids(
282 hummock_version: &HummockVersion,
283 compaction_group_id: CompactionGroupId,
284) -> Vec<HummockSstableObjectId> {
285 let levels = match hummock_version.levels.get(&compaction_group_id) {
286 Some(levels) => levels,
287 None => return vec![],
288 };
289 levels
290 .levels
291 .iter()
292 .chain(levels.l0.sub_levels.iter())
293 .flat_map(|levels| levels.table_infos.iter().map(|info| info.object_id))
294 .sorted()
295 .collect_vec()
296}
297
298pub async fn setup_compute_env_with_config(
299 port: i32,
300 config: CompactionConfig,
301) -> (
302 MetaSrvEnv,
303 HummockManagerRef,
304 ClusterControllerRef,
305 WorkerId,
306) {
307 setup_compute_env_with_metric(port, config, None).await
308}
309
310pub async fn setup_compute_env_with_metric(
311 port: i32,
312 config: CompactionConfig,
313 meta_metric: Option<MetaMetrics>,
314) -> (
315 MetaSrvEnv,
316 HummockManagerRef,
317 ClusterControllerRef,
318 WorkerId,
319) {
320 let env = MetaSrvEnv::for_test().await;
321 let cluster_ctl = Arc::new(
322 ClusterController::new(env.clone(), Duration::from_secs(1))
323 .await
324 .unwrap(),
325 );
326 let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
327
328 let compactor_manager = Arc::new(CompactorManager::for_test());
329
330 let (compactor_streams_change_tx, _compactor_streams_change_rx) =
331 tokio::sync::mpsc::unbounded_channel();
332
333 let hummock_manager = HummockManager::with_config(
334 env.clone(),
335 cluster_ctl.clone(),
336 catalog_ctl,
337 Arc::new(meta_metric.unwrap_or_default()),
338 compactor_manager,
339 config,
340 compactor_streams_change_tx,
341 )
342 .await;
343
344 let fake_host_address = HostAddress {
345 host: "127.0.0.1".to_owned(),
346 port,
347 };
348 let fake_parallelism = 4;
349 let worker_id = cluster_ctl
350 .add_worker(
351 WorkerType::ComputeNode,
352 fake_host_address,
353 Property {
354 is_streaming: true,
355 is_serving: true,
356 is_unschedulable: false,
357 parallelism: fake_parallelism as _,
358 ..Default::default()
359 },
360 Default::default(),
361 )
362 .await
363 .unwrap();
364 (env, hummock_manager, cluster_ctl, worker_id)
365}
366
367pub async fn setup_compute_env(
368 port: i32,
369) -> (
370 MetaSrvEnv,
371 HummockManagerRef,
372 ClusterControllerRef,
373 WorkerId,
374) {
375 let config = CompactionConfigBuilder::new()
376 .level0_tier_compact_file_number(1)
377 .level0_max_compact_file_number(130)
378 .level0_sub_level_compact_level_count(1)
379 .level0_overlapping_sub_level_compact_level_count(1)
380 .build();
381 setup_compute_env_with_config(port, config).await
382}
383
384pub async fn get_sst_ids(
385 hummock_manager: &HummockManager,
386 number: u32,
387) -> Vec<HummockSstableObjectId> {
388 let range = hummock_manager.get_new_sst_ids(number).await.unwrap();
389 (range.start_id..range.end_id).collect_vec()
390}
391
392pub async fn add_ssts(
393 epoch: HummockEpoch,
394 hummock_manager: &HummockManager,
395 hummock_meta_client: Arc<dyn HummockMetaClient>,
396) -> Vec<SstableInfo> {
397 let table_ids = get_sst_ids(hummock_manager, 3).await;
398 let test_tables = generate_test_sstables_with_table_id(test_epoch(epoch), 1, table_ids);
399 let ssts = to_local_sstable_info(&test_tables);
400 hummock_meta_client
401 .commit_epoch(
402 epoch,
403 SyncResult {
404 uncommitted_ssts: ssts,
405 ..Default::default()
406 },
407 )
408 .await
409 .unwrap();
410 test_tables
411}
412
413pub fn compaction_selector_context<'a>(
414 group: &'a CompactionGroup,
415 levels: &'a Levels,
416 member_table_ids: &'a BTreeSet<TableId>,
417 level_handlers: &'a mut [LevelHandler],
418 selector_stats: &'a mut LocalSelectorStatistic,
419 table_id_to_options: &'a HashMap<u32, TableOption>,
420 developer_config: Arc<CompactionDeveloperConfig>,
421 table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
422 state_table_info: &'a HummockVersionStateTableInfo,
423) -> CompactionSelectorContext<'a> {
424 CompactionSelectorContext {
425 group,
426 levels,
427 member_table_ids,
428 level_handlers,
429 selector_stats,
430 table_id_to_options,
431 developer_config,
432 table_watermarks,
433 state_table_info,
434 }
435}
436
437pub async fn get_compaction_group_id_by_table_id(
438 hummock_manager_ref: HummockManagerRef,
439 table_id: u32,
440) -> u64 {
441 let version = hummock_manager_ref.get_current_version().await;
442 let mapping = version.state_table_info.build_table_compaction_group_id();
443 *mapping.get(&(table_id.into())).unwrap()
444}