1#![cfg(any(test, feature = "test"))]
16
17use std::collections::{BTreeSet, HashMap};
18use std::sync::Arc;
19use std::time::Duration;
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::util::epoch::test_epoch;
25use risingwave_hummock_sdk::key::key_with_epoch;
26use risingwave_hummock_sdk::key_range::KeyRange;
27use risingwave_hummock_sdk::level::Levels;
28use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
29use risingwave_hummock_sdk::table_watermark::TableWatermarks;
30use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
31use risingwave_hummock_sdk::{
32 CompactionGroupId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult,
33};
34use risingwave_meta_model::WorkerId;
35use risingwave_pb::common::worker_node::Property;
36use risingwave_pb::common::{HostAddress, WorkerType};
37use risingwave_pb::hummock::CompactionConfig;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use risingwave_rpc_client::HummockMetaClient;
40
41use crate::controller::catalog::CatalogController;
42use crate::controller::cluster::{ClusterController, ClusterControllerRef};
43use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
44use crate::hummock::compaction::selector::{LocalSelectorStatistic, default_compaction_selector};
45use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext};
46use crate::hummock::level_handler::LevelHandler;
47pub use crate::hummock::manager::CommitEpochInfo;
48use crate::hummock::model::CompactionGroup;
49use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
50use crate::manager::MetaSrvEnv;
51use crate::rpc::metrics::MetaMetrics;
52
53pub fn to_local_sstable_info(ssts: &[SstableInfo]) -> Vec<LocalSstableInfo> {
54 ssts.iter()
55 .map(|sst| LocalSstableInfo::for_test(sst.clone()))
56 .collect_vec()
57}
58
59pub async fn add_test_tables(
65 hummock_manager: &HummockManager,
66 hummock_meta_client: Arc<dyn HummockMetaClient>,
67 compaction_group_id: CompactionGroupId,
68) -> Vec<Vec<SstableInfo>> {
69 use risingwave_common::util::epoch::EpochExt;
72
73 let mut epoch = test_epoch(1);
74 let sstable_ids = get_sst_ids(hummock_manager, 3).await;
75 let test_tables = generate_test_sstables_with_table_id(epoch, 1, sstable_ids);
76 register_sstable_infos_to_compaction_group(hummock_manager, &test_tables, compaction_group_id)
77 .await;
78 let test_local_tables = to_local_sstable_info(&test_tables);
79 hummock_meta_client
80 .commit_epoch(
81 epoch,
82 SyncResult {
83 uncommitted_ssts: test_local_tables,
84 ..Default::default()
85 },
86 )
87 .await
88 .unwrap();
89
90 let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
92 register_sstable_infos_to_compaction_group(
93 hummock_manager,
94 &test_tables_2,
95 compaction_group_id,
96 )
97 .await;
98 let mut compact_task = hummock_manager
99 .get_compact_task(compaction_group_id, &mut default_compaction_selector())
100 .await
101 .unwrap()
102 .unwrap();
103 assert_eq!(
104 compact_task
105 .input_ssts
106 .iter()
107 .map(|i| i.table_infos.len())
108 .sum::<usize>(),
109 3
110 );
111
112 compact_task.target_level = 6;
113 hummock_manager
114 .report_compact_task_for_test(
115 compact_task.task_id,
116 Some(compact_task),
117 TaskStatus::Success,
118 test_tables_2.clone(),
119 None,
120 )
121 .await
122 .unwrap();
123 epoch.inc_epoch();
125 let test_tables_3 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await);
126 register_sstable_infos_to_compaction_group(
127 hummock_manager,
128 &test_tables_3,
129 compaction_group_id,
130 )
131 .await;
132 let test_local_tables_3 = to_local_sstable_info(&test_tables_3);
133 hummock_meta_client
134 .commit_epoch(
135 epoch,
136 SyncResult {
137 uncommitted_ssts: test_local_tables_3,
138 ..Default::default()
139 },
140 )
141 .await
142 .unwrap();
143 vec![test_tables, test_tables_2, test_tables_3]
144}
145
146pub fn generate_test_sstables_with_table_id(
147 epoch: u64,
148 table_id: impl Into<TableId>,
149 sst_ids: Vec<u64>,
150) -> Vec<SstableInfo> {
151 let table_id = table_id.into();
152 let mut sst_info = vec![];
153 for (i, sst_id) in sst_ids.into_iter().enumerate() {
154 let object_size = 2;
155 sst_info.push(
156 SstableInfoInner {
157 object_id: sst_id.into(),
158 sst_id: sst_id.into(),
159 key_range: KeyRange {
160 left: Bytes::from(key_with_epoch(
161 format!("{:03}\0\0_key_test_{:05}", table_id, i + 1)
162 .as_bytes()
163 .to_vec(),
164 epoch,
165 )),
166 right: Bytes::from(key_with_epoch(
167 format!("{:03}\0\0_key_test_{:05}", table_id, (i + 1) * 10)
168 .as_bytes()
169 .to_vec(),
170 epoch,
171 )),
172 right_exclusive: false,
173 },
174 file_size: object_size,
175 table_ids: vec![table_id],
176 uncompressed_file_size: object_size,
177 max_epoch: epoch,
178 sst_size: object_size,
179 ..Default::default()
180 }
181 .into(),
182 );
183 }
184 sst_info
185}
186
187pub fn generate_test_tables(epoch: u64, sst_ids: Vec<u64>) -> Vec<SstableInfo> {
188 let mut sst_info = vec![];
189 for (i, sst_id) in sst_ids.into_iter().enumerate() {
190 let object_size = 2;
191 sst_info.push(
192 SstableInfoInner {
193 object_id: sst_id.into(),
194 sst_id: sst_id.into(),
195 key_range: KeyRange {
196 left: Bytes::from(iterator_test_key_of_epoch(sst_id, i + 1, epoch)),
197 right: Bytes::from(iterator_test_key_of_epoch(sst_id, (i + 1) * 10, epoch)),
198 right_exclusive: false,
199 },
200 file_size: object_size,
201 table_ids: vec![(sst_id as u32).into(), (sst_id as u32 * 10000).into()],
202 uncompressed_file_size: object_size,
203 max_epoch: epoch,
204 sst_size: object_size,
205 ..Default::default()
206 }
207 .into(),
208 );
209 }
210 sst_info
211}
212
213pub async fn register_sstable_infos_to_compaction_group(
214 compaction_group_manager_ref: &HummockManager,
215 sstable_infos: &[SstableInfo],
216 compaction_group_id: CompactionGroupId,
217) {
218 let table_ids = sstable_infos
219 .iter()
220 .flat_map(|sstable_info| &sstable_info.table_ids)
221 .sorted()
222 .dedup()
223 .copied()
224 .collect_vec();
225 register_table_ids_to_compaction_group(
226 compaction_group_manager_ref,
227 &table_ids,
228 compaction_group_id,
229 )
230 .await;
231}
232
233pub async fn register_table_ids_to_compaction_group(
234 hummock_manager_ref: &HummockManager,
235 table_ids: &[impl Into<TableId> + Copy],
236 compaction_group_id: CompactionGroupId,
237) {
238 hummock_manager_ref
239 .register_table_ids_for_test(
240 &table_ids
241 .iter()
242 .map(|table_id| (*table_id, compaction_group_id))
243 .collect_vec(),
244 )
245 .await
246 .unwrap();
247}
248
249pub async fn unregister_table_ids_from_compaction_group(
250 hummock_manager_ref: &HummockManager,
251 table_ids: &[impl Into<TableId> + Copy],
252) {
253 hummock_manager_ref
254 .unregister_table_ids(table_ids.iter().map(|table_id| (*table_id).into()))
255 .await
256 .unwrap();
257}
258
259pub fn iterator_test_key_of_epoch(table: u64, idx: usize, ts: HummockEpoch) -> Vec<u8> {
261 key_with_epoch(
263 format!("{:03}\0\0_key_test_{:05}", table, idx)
264 .as_bytes()
265 .to_vec(),
266 ts,
267 )
268}
269
270pub fn get_sorted_object_ids(sstables: &[SstableInfo]) -> Vec<HummockSstableObjectId> {
271 sstables
272 .iter()
273 .map(|table| table.object_id)
274 .sorted()
275 .collect_vec()
276}
277
278pub fn get_sorted_committed_object_ids(
279 hummock_version: &HummockVersion,
280 compaction_group_id: CompactionGroupId,
281) -> Vec<HummockSstableObjectId> {
282 let levels = match hummock_version.levels.get(&compaction_group_id) {
283 Some(levels) => levels,
284 None => return vec![],
285 };
286 levels
287 .levels
288 .iter()
289 .chain(levels.l0.sub_levels.iter())
290 .flat_map(|levels| levels.table_infos.iter().map(|info| info.object_id))
291 .sorted()
292 .collect_vec()
293}
294
295pub async fn setup_compute_env_with_config(
296 port: i32,
297 config: CompactionConfig,
298) -> (
299 MetaSrvEnv,
300 HummockManagerRef,
301 ClusterControllerRef,
302 WorkerId,
303) {
304 setup_compute_env_with_metric(port, config, None).await
305}
306
307pub async fn setup_compute_env_with_metric(
308 port: i32,
309 config: CompactionConfig,
310 meta_metric: Option<MetaMetrics>,
311) -> (
312 MetaSrvEnv,
313 HummockManagerRef,
314 ClusterControllerRef,
315 WorkerId,
316) {
317 let env = MetaSrvEnv::for_test().await;
318 let cluster_ctl = Arc::new(
319 ClusterController::new(env.clone(), Duration::from_secs(1))
320 .await
321 .unwrap(),
322 );
323 let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
324
325 let compactor_manager = Arc::new(CompactorManager::for_test());
326
327 let (compactor_streams_change_tx, _compactor_streams_change_rx) =
328 tokio::sync::mpsc::unbounded_channel();
329
330 let hummock_manager = HummockManager::with_config(
331 env.clone(),
332 cluster_ctl.clone(),
333 catalog_ctl,
334 Arc::new(meta_metric.unwrap_or_default()),
335 compactor_manager,
336 config,
337 compactor_streams_change_tx,
338 )
339 .await;
340
341 let fake_host_address = HostAddress {
342 host: "127.0.0.1".to_owned(),
343 port,
344 };
345 let fake_parallelism = 4;
346 let worker_id = cluster_ctl
347 .add_worker(
348 WorkerType::ComputeNode,
349 fake_host_address,
350 Property {
351 is_streaming: true,
352 is_serving: true,
353 is_unschedulable: false,
354 parallelism: fake_parallelism as _,
355 ..Default::default()
356 },
357 Default::default(),
358 )
359 .await
360 .unwrap();
361 (env, hummock_manager, cluster_ctl, worker_id)
362}
363
364pub async fn setup_compute_env(
365 port: i32,
366) -> (
367 MetaSrvEnv,
368 HummockManagerRef,
369 ClusterControllerRef,
370 WorkerId,
371) {
372 let config = CompactionConfigBuilder::new()
373 .level0_tier_compact_file_number(1)
374 .level0_max_compact_file_number(130)
375 .level0_sub_level_compact_level_count(1)
376 .level0_overlapping_sub_level_compact_level_count(1)
377 .build();
378 setup_compute_env_with_config(port, config).await
379}
380
381pub async fn get_sst_ids(hummock_manager: &HummockManager, number: u32) -> Vec<u64> {
382 let range = hummock_manager.get_new_object_ids(number).await.unwrap();
383 (range.start_id.inner()..range.end_id.inner()).collect_vec()
384}
385
386pub async fn add_ssts(
387 epoch: HummockEpoch,
388 hummock_manager: &HummockManager,
389 hummock_meta_client: Arc<dyn HummockMetaClient>,
390) -> Vec<SstableInfo> {
391 let table_ids = get_sst_ids(hummock_manager, 3).await;
392 let test_tables = generate_test_sstables_with_table_id(test_epoch(epoch), 1, table_ids);
393 let ssts = to_local_sstable_info(&test_tables);
394 hummock_meta_client
395 .commit_epoch(
396 epoch,
397 SyncResult {
398 uncommitted_ssts: ssts,
399 ..Default::default()
400 },
401 )
402 .await
403 .unwrap();
404 test_tables
405}
406
407pub fn compaction_selector_context<'a>(
408 group: &'a CompactionGroup,
409 levels: &'a Levels,
410 member_table_ids: &'a BTreeSet<TableId>,
411 level_handlers: &'a mut [LevelHandler],
412 selector_stats: &'a mut LocalSelectorStatistic,
413 table_id_to_options: &'a HashMap<TableId, TableOption>,
414 developer_config: Arc<CompactionDeveloperConfig>,
415 table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
416 state_table_info: &'a HummockVersionStateTableInfo,
417) -> CompactionSelectorContext<'a> {
418 CompactionSelectorContext {
419 group,
420 levels,
421 member_table_ids,
422 level_handlers,
423 selector_stats,
424 table_id_to_options,
425 developer_config,
426 table_watermarks,
427 state_table_info,
428 }
429}
430
431pub async fn get_compaction_group_id_by_table_id(
432 hummock_manager_ref: HummockManagerRef,
433 table_id: impl Into<TableId>,
434) -> u64 {
435 let version = hummock_manager_ref.get_current_version().await;
436 let mapping = version.state_table_info.build_table_compaction_group_id();
437 *mapping.get(&table_id.into()).unwrap()
438}