1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::Hint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, LocalSstableInfo};
31use risingwave_pb::hummock::{PbSstableFilterLayout, PbSstableFilterType};
32use thiserror_ext::AsReport;
33use tracing::error;
34
35use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
36use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
37use crate::hummock::compactor::compaction_utils::{
38 blocked_xor_filter_key_count_threshold, estimate_output_key_count_by_size,
39};
40use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
41use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
42use crate::hummock::event_handler::uploader::UploadTaskOutput;
43use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
44use crate::hummock::{
45 CachePolicy, GetObjectId, HummockError, HummockResult, ObjectIdManagerRef,
46 SstableBuilderOptions,
47};
48use crate::mem_table::ImmutableMemtable;
49use crate::opts::StorageOpts;
50
51const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
52
53pub async fn compact(
55 context: CompactorContext,
56 object_id_manager: ObjectIdManagerRef,
57 payload: Vec<ImmutableMemtable>,
58 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
59) -> HummockResult<UploadTaskOutput> {
60 let table_ids_with_old_value: HashSet<TableId> = payload
61 .iter()
62 .filter(|imm| imm.has_old_value())
63 .map(|imm| imm.table_id)
64 .collect();
65 let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
66 let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
67 let mut old_value_payload = Vec::with_capacity(payload.len());
68 for imm in payload {
69 if table_ids_with_old_value.contains(&imm.table_id) {
70 if imm.has_old_value() {
71 old_value_payload.push(imm.clone());
72 }
73 log_store_new_value_payload.push(imm);
74 } else {
75 assert!(!imm.has_old_value());
76 non_log_store_new_value_payload.push(imm);
77 }
78 }
79 let non_log_store_new_value_future = async {
80 if non_log_store_new_value_payload.is_empty() {
81 Ok(vec![])
82 } else {
83 compact_shared_buffer::<true>(
84 context.clone(),
85 object_id_manager.clone(),
86 compaction_catalog_manager_ref.clone(),
87 non_log_store_new_value_payload,
88 )
89 .instrument_await("shared_buffer_compact_non_log_store_new_value")
90 .await
91 }
92 };
93
94 let log_store_new_value_future = async {
95 if log_store_new_value_payload.is_empty() {
96 Ok(vec![])
97 } else {
98 compact_shared_buffer::<true>(
99 context.clone(),
100 object_id_manager.clone(),
101 compaction_catalog_manager_ref.clone(),
102 log_store_new_value_payload,
103 )
104 .instrument_await("shared_buffer_compact_log_store_new_value")
105 .await
106 }
107 };
108
109 let old_value_future = async {
110 if old_value_payload.is_empty() {
111 Ok(vec![])
112 } else {
113 compact_shared_buffer::<false>(
114 context.clone(),
115 object_id_manager.clone(),
116 compaction_catalog_manager_ref.clone(),
117 old_value_payload,
118 )
119 .instrument_await("shared_buffer_compact_log_store_old_value")
120 .await
121 }
122 };
123
124 let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
126 try_join(non_log_store_new_value_future, log_store_new_value_future),
127 old_value_future,
128 )
129 .await?;
130
131 let mut new_value_ssts = non_log_store_new_value_ssts;
132 new_value_ssts.extend(log_store_new_value_ssts);
133
134 Ok(UploadTaskOutput {
135 new_value_ssts,
136 old_value_ssts,
137 wait_poll_timer: None,
138 })
139}
140
141async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
146 context: CompactorContext,
147 object_id_manager: ObjectIdManagerRef,
148 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
149 mut payload: Vec<ImmutableMemtable>,
150) -> HummockResult<Vec<LocalSstableInfo>> {
151 if !IS_NEW_VALUE {
152 assert!(payload.iter().all(|imm| imm.has_old_value()));
153 }
154 let existing_table_ids: HashSet<TableId> =
156 payload.iter().map(|imm| imm.table_id).dedup().collect();
157 assert!(!existing_table_ids.is_empty());
158
159 let compaction_catalog_agent_ref = compaction_catalog_manager_ref
160 .acquire(existing_table_ids.iter().copied().collect())
161 .await?;
162 let existing_table_ids = compaction_catalog_agent_ref
163 .table_ids()
164 .collect::<HashSet<_>>();
165 payload.retain(|imm| {
166 let ret = existing_table_ids.contains(&imm.table_id);
167 if !ret {
168 error!(
169 "can not find table {:?}, it may be removed by meta-service",
170 imm.table_id
171 );
172 }
173 ret
174 });
175
176 let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
177 let (splits, sub_compaction_sstable_size, table_vnode_partition, compact_data_size) =
178 generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
179 let parallelism = splits.len();
180 let mut compact_success = true;
181 let mut output_ssts = Vec::with_capacity(parallelism);
182 let mut compaction_futures = vec![];
183 let estimated_output_key_count = estimate_output_key_count_by_size(
186 total_key_count as u64,
187 compact_data_size,
188 sub_compaction_sstable_size as usize,
189 );
190 let sstable_filter_layout =
191 if risingwave_hummock_sdk::filter_utils::should_use_blocked_xor_filter_by_kv_count(
192 estimated_output_key_count as u64,
193 None,
194 ) {
195 PbSstableFilterLayout::Blocked
196 } else {
197 PbSstableFilterLayout::Plain
198 };
199
200 for (split_index, key_range) in splits.into_iter().enumerate() {
201 let compactor = SharedBufferCompactRunner::new(
202 split_index,
203 key_range,
204 context.clone(),
205 sub_compaction_sstable_size as usize,
206 estimated_output_key_count,
207 table_vnode_partition.clone(),
208 sstable_filter_layout,
209 object_id_manager.clone(),
210 );
211 let mut forward_iters = Vec::with_capacity(payload.len());
212 for imm in &payload {
213 forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
214 }
215 let compaction_executor = context.compaction_executor.clone();
216 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
217 let handle = compaction_executor.spawn({
218 static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
219 LazyLock::new(|| AtomicUsize::new(0));
220 let tree_root = context.await_tree_reg.as_ref().map(|reg| {
221 let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
222 reg.register(
223 await_tree_key::CompactSharedBuffer { id },
224 format!(
225 "Compact Shared Buffer: {:?}",
226 payload
227 .iter()
228 .map(|imm| imm.epoch())
229 .collect::<BTreeSet<_>>()
230 ),
231 )
232 });
233 let future = compactor.run(
234 MergeIterator::new(forward_iters),
235 compaction_catalog_agent_ref,
236 );
237 if let Some(root) = tree_root {
238 root.instrument(future).left_future()
239 } else {
240 future.right_future()
241 }
242 });
243 compaction_futures.push(handle);
244 }
245
246 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
247 let mut err = None;
248 while let Some(future_result) = buffered.next().await {
249 match future_result {
250 Ok(Ok((split_index, ssts, table_stats_map))) => {
251 output_ssts.push((split_index, ssts, table_stats_map));
252 }
253 Ok(Err(e)) => {
254 compact_success = false;
255 tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
256 err = Some(e);
257 }
258 Err(e) => {
259 compact_success = false;
260 tracing::warn!(
261 error = %e.as_report(),
262 "Shared Buffer Compaction failed with future error",
263 );
264 err = Some(HummockError::compaction_executor(
265 "failed while execute in tokio",
266 ));
267 }
268 }
269 }
270
271 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
273
274 if compact_success {
275 let mut level0 = Vec::with_capacity(parallelism);
276 let mut sst_infos = vec![];
277 for (_, ssts, _) in output_ssts {
278 for sst_info in &ssts {
279 context
280 .compactor_metrics
281 .write_build_l0_bytes
282 .inc_by(sst_info.file_size());
283
284 sst_infos.push(sst_info.sst_info.clone());
285 }
286 level0.extend(ssts);
287 }
288 if context.storage_opts.check_compaction_result {
289 let compaction_executor = context.compaction_executor.clone();
290 let mut forward_iters = Vec::with_capacity(payload.len());
291 for imm in &payload {
292 if !existing_table_ids.contains(&imm.table_id) {
293 continue;
294 }
295 forward_iters.push(imm.clone().into_forward_iter());
296 }
297 let iter = MergeIterator::new(forward_iters);
298 let left_iter = UserIterator::new(
299 iter,
300 (Bound::Unbounded, Bound::Unbounded),
301 u64::MAX,
302 0,
303 None,
304 );
305 compaction_executor.spawn(async move {
306 match check_flush_result(
307 left_iter,
308 sst_infos,
309 context,
310 )
311 .await
312 {
313 Err(e) => {
314 tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
315 }
316 Ok(true) => (),
317 Ok(false) => {
318 panic!(
319 "failed to check flush result consistency of state-table {:?}",
320 existing_table_ids
321 );
322 }
323 }
324 });
325 }
326 Ok(level0)
327 } else {
328 Err(err.unwrap())
329 }
330}
331
332fn generate_splits(
334 payload: &Vec<ImmutableMemtable>,
335 existing_table_ids: &HashSet<TableId>,
336 storage_opts: &StorageOpts,
337) -> (Vec<KeyRange>, u64, BTreeMap<TableId, u32>, u64) {
338 let mut size_and_start_user_keys = vec![];
339 let mut compact_data_size = 0;
340 let mut table_size_infos: HashMap<TableId, u64> = HashMap::default();
341 let mut table_vnode_partition = BTreeMap::default();
342 for imm in payload {
343 let data_size = {
344 (imm.value_count() * EPOCH_LEN + imm.size()) as u64
346 };
347 compact_data_size += data_size;
348 size_and_start_user_keys.push((
349 data_size,
350 FullKey {
351 user_key: imm.start_user_key(),
352 epoch_with_gap: EpochWithGap::new_max_epoch(),
353 }
354 .encode(),
355 ));
356 let v = table_size_infos.entry(imm.table_id).or_insert(0);
357 *v += data_size;
358 }
359
360 size_and_start_user_keys
361 .sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
362 let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
363 splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
364 let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
365 let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
366 let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
367 let parallelism = std::cmp::min(
368 storage_opts.share_buffers_sync_parallelism as u64,
369 size_and_start_user_keys.len() as u64,
370 );
371 let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
372 compact_data_size / parallelism
373 } else {
374 compact_data_size
375 };
376
377 if parallelism > 1 && compact_data_size > sstable_size {
378 let mut last_buffer_size = 0;
379 let mut last_key: Vec<u8> = vec![];
380 for (data_size, key) in size_and_start_user_keys {
381 if last_buffer_size >= sub_compaction_data_size && !last_key.eq(&key) {
382 splits.last_mut().unwrap().right = Bytes::from(key.clone());
383 splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
384 last_buffer_size = data_size;
385 } else {
386 last_buffer_size += data_size;
387 }
388
389 last_key = key;
390 }
391 }
392
393 if compact_data_size > sstable_size {
394 for table_id in existing_table_ids {
397 if let Some(table_size) = table_size_infos.get(table_id)
398 && *table_size > min_sstable_size
399 {
400 table_vnode_partition.insert(*table_id, 1);
401 }
402 }
403 }
404
405 let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
408 (
409 splits,
410 sub_compaction_sstable_size,
411 table_vnode_partition,
412 compact_data_size,
413 )
414}
415
416pub struct SharedBufferCompactRunner {
417 compactor: Compactor,
418 split_index: usize,
419}
420
421impl SharedBufferCompactRunner {
422 pub fn new(
423 split_index: usize,
424 key_range: KeyRange,
425 context: CompactorContext,
426 sub_compaction_sstable_size: usize,
427 estimated_output_key_count: usize,
428 table_vnode_partition: BTreeMap<TableId, u32>,
429 sstable_filter_layout: PbSstableFilterLayout,
430 object_id_getter: Arc<dyn GetObjectId>,
431 ) -> Self {
432 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
433 options.capacity = sub_compaction_sstable_size;
434 options.estimated_output_key_count = Some(estimated_output_key_count);
435 options.filter_hash_prealloc_key_count_cap = blocked_xor_filter_key_count_threshold(None);
436 let compactor = Compactor::new(
437 context,
438 options,
439 super::TaskConfig {
440 key_range,
441 cache_policy: CachePolicy::Fill(Hint::Normal),
442 gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
443 retain_multiple_version: true,
444 table_vnode_partition,
445 sstable_filter_layout,
446 sstable_filter_type: PbSstableFilterType::SstableFilterXor16,
448 table_schemas: Default::default(),
449 disable_drop_column_optimization: false,
450 },
451 object_id_getter,
452 );
453 Self {
454 compactor,
455 split_index,
456 }
457 }
458
459 pub async fn run(
460 self,
461 iter: impl HummockIterator<Direction = Forward>,
462 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
463 ) -> HummockResult<CompactOutput> {
464 let dummy_compaction_filter = DummyCompactionFilter {};
465 let (ssts, table_stats_map) = self
466 .compactor
467 .compact_key_range(
468 iter,
469 dummy_compaction_filter,
470 compaction_catalog_agent_ref,
471 None,
472 None,
473 None,
474 )
475 .await?;
476 Ok((self.split_index, ssts, table_stats_map))
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use std::collections::HashSet;
483
484 use bytes::Bytes;
485 use risingwave_common::catalog::TableId;
486 use risingwave_common::hash::VirtualNode;
487 use risingwave_common::util::epoch::test_epoch;
488 use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
489
490 use crate::hummock::compactor::shared_buffer_compact::generate_splits;
491 use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
492 use crate::mem_table::ImmutableMemtable;
493 use crate::opts::StorageOpts;
494
495 fn generate_key(key: &str) -> TableKey<Bytes> {
496 TableKey(prefix_slice_with_vnode(
497 VirtualNode::from_index(1),
498 key.as_bytes(),
499 ))
500 }
501
502 #[tokio::test]
503 async fn test_generate_splits_in_order() {
504 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
505 test_epoch(3),
506 0,
507 vec![(
508 generate_key("dddd"),
509 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
510 )],
511 1024 * 1024,
512 TableId::new(1),
513 );
514 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
515 test_epoch(3),
516 0,
517 vec![(
518 generate_key("abb"),
519 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
520 )],
521 (1024 + 256) * 1024,
522 TableId::new(1),
523 );
524
525 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
526 test_epoch(2),
527 0,
528 vec![(
529 generate_key("abc"),
530 SharedBufferValue::Insert(Bytes::from_static(b"v2")),
531 )],
532 (1024 + 512) * 1024,
533 TableId::new(1),
534 );
535 let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
536 test_epoch(3),
537 0,
538 vec![(
539 generate_key("aaa"),
540 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
541 )],
542 (1024 + 512) * 1024,
543 TableId::new(1),
544 );
545
546 let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
547 test_epoch(3),
548 0,
549 vec![(
550 generate_key("aaa"),
551 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
552 )],
553 (1024 + 256) * 1024,
554 TableId::new(2),
555 );
556
557 let storage_opts = StorageOpts {
558 share_buffers_sync_parallelism: 3,
559 parallel_compact_size_mb: 1,
560 sstable_size_mb: 1,
561 ..Default::default()
562 };
563 let payload = vec![imm1, imm2, imm3, imm4, imm5];
564 let (splits, _sstable_capacity, vnodes, _) = generate_splits(
565 &payload,
566 &HashSet::from_iter([1.into(), 2.into()]),
567 &storage_opts,
568 );
569 assert_eq!(
570 splits.len(),
571 storage_opts.share_buffers_sync_parallelism as usize
572 );
573 assert!(vnodes.is_empty());
574
575 for i in 1..splits.len() {
577 assert_eq!(splits[i].left, splits[i - 1].right);
578 assert!(splits[i].left > splits[i - 1].left);
579 assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
580 }
581 }
582
583 #[tokio::test]
584 async fn test_generate_splits_no_duplicate_keys() {
585 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
588 test_epoch(1),
589 0,
590 vec![(
591 generate_key("zzz"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
593 )],
594 2 * 1024 * 1024, TableId::new(1),
596 );
597
598 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
599 test_epoch(1),
600 0,
601 vec![(
602 generate_key("aaa"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
604 )],
605 2 * 1024 * 1024, TableId::new(1),
607 );
608
609 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
610 test_epoch(1),
611 0,
612 vec![(
613 generate_key("mmm"), SharedBufferValue::Insert(Bytes::from_static(b"v1")),
615 )],
616 2 * 1024 * 1024, TableId::new(1),
618 );
619
620 let storage_opts = StorageOpts {
621 share_buffers_sync_parallelism: 3, parallel_compact_size_mb: 2, sstable_size_mb: 1, ..Default::default()
625 };
626
627 let payload = vec![imm1, imm2, imm3];
629 let (splits, _sstable_capacity, _vnodes, _) =
630 generate_splits(&payload, &HashSet::from_iter([1.into()]), &storage_opts);
631
632 assert!(
634 splits.len() > 1,
635 "Expected multiple splits, got {}",
636 splits.len()
637 );
638
639 for i in 0..splits.len() {
641 for j in (i + 1)..splits.len() {
642 let split_i = &splits[i];
643 let split_j = &splits[j];
644
645 if !split_i.right.is_empty() && !split_j.left.is_empty() {
647 assert!(
648 split_i.right <= split_j.left || split_j.right <= split_i.left,
649 "Split {} and {} overlap: [{:?}, {:?}) vs [{:?}, {:?})",
650 i,
651 j,
652 split_i.left,
653 split_i.right,
654 split_j.left,
655 split_j.right
656 );
657 }
658 }
659 }
660
661 for i in 1..splits.len() {
663 if !splits[i - 1].right.is_empty() && !splits[i].left.is_empty() {
664 assert!(
665 splits[i - 1].right <= splits[i].left,
666 "Splits are not in sorted order at index {}: {:?} > {:?}",
667 i,
668 splits[i - 1].right,
669 splits[i].left
670 );
671 }
672 }
673 }
674}