1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::Bound;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20
21use await_tree::InstrumentAwait;
22use bytes::Bytes;
23use foyer::CacheHint;
24use futures::future::try_join;
25use futures::{FutureExt, StreamExt, stream};
26use itertools::Itertools;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::key::{EPOCH_LEN, FullKey, FullKeyTracker, UserKey};
29use risingwave_hummock_sdk::key_range::KeyRange;
30use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo};
31use risingwave_pb::hummock::compact_task;
32use thiserror_ext::AsReport;
33use tracing::{error, warn};
34
35use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
36use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
37use crate::hummock::compactor::context::{CompactorContext, await_tree_key};
38use crate::hummock::compactor::{CompactOutput, Compactor, check_flush_result};
39use crate::hummock::event_handler::uploader::UploadTaskOutput;
40use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
41use crate::hummock::shared_buffer::shared_buffer_batch::{
42 SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
43 VersionedSharedBufferValue,
44};
45use crate::hummock::utils::MemoryTracker;
46use crate::hummock::{
47 BlockedXor16FilterBuilder, CachePolicy, GetObjectId, HummockError, HummockResult,
48 SstableBuilderOptions, SstableObjectIdManagerRef,
49};
50use crate::mem_table::ImmutableMemtable;
51use crate::opts::StorageOpts;
52
53const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
54
55pub async fn compact(
57 context: CompactorContext,
58 sstable_object_id_manager: SstableObjectIdManagerRef,
59 payload: Vec<ImmutableMemtable>,
60 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
61) -> HummockResult<UploadTaskOutput> {
62 let table_ids_with_old_value: HashSet<TableId> = payload
63 .iter()
64 .filter(|imm| imm.has_old_value())
65 .map(|imm| imm.table_id)
66 .collect();
67 let mut non_log_store_new_value_payload = Vec::with_capacity(payload.len());
68 let mut log_store_new_value_payload = Vec::with_capacity(payload.len());
69 let mut old_value_payload = Vec::with_capacity(payload.len());
70 for imm in payload {
71 if table_ids_with_old_value.contains(&imm.table_id) {
72 if imm.has_old_value() {
73 old_value_payload.push(imm.clone());
74 }
75 log_store_new_value_payload.push(imm);
76 } else {
77 assert!(!imm.has_old_value());
78 non_log_store_new_value_payload.push(imm);
79 }
80 }
81 let non_log_store_new_value_future = async {
82 if non_log_store_new_value_payload.is_empty() {
83 Ok(vec![])
84 } else {
85 compact_shared_buffer::<true>(
86 context.clone(),
87 sstable_object_id_manager.clone(),
88 compaction_catalog_manager_ref.clone(),
89 non_log_store_new_value_payload,
90 )
91 .instrument_await("shared_buffer_compact_non_log_store_new_value")
92 .await
93 }
94 };
95
96 let log_store_new_value_future = async {
97 if log_store_new_value_payload.is_empty() {
98 Ok(vec![])
99 } else {
100 compact_shared_buffer::<true>(
101 context.clone(),
102 sstable_object_id_manager.clone(),
103 compaction_catalog_manager_ref.clone(),
104 log_store_new_value_payload,
105 )
106 .instrument_await("shared_buffer_compact_log_store_new_value")
107 .await
108 }
109 };
110
111 let old_value_future = async {
112 if old_value_payload.is_empty() {
113 Ok(vec![])
114 } else {
115 compact_shared_buffer::<false>(
116 context.clone(),
117 sstable_object_id_manager.clone(),
118 compaction_catalog_manager_ref.clone(),
119 old_value_payload,
120 )
121 .instrument_await("shared_buffer_compact_log_store_old_value")
122 .await
123 }
124 };
125
126 let ((non_log_store_new_value_ssts, log_store_new_value_ssts), old_value_ssts) = try_join(
128 try_join(non_log_store_new_value_future, log_store_new_value_future),
129 old_value_future,
130 )
131 .await?;
132
133 let mut new_value_ssts = non_log_store_new_value_ssts;
134 new_value_ssts.extend(log_store_new_value_ssts);
135
136 Ok(UploadTaskOutput {
137 new_value_ssts,
138 old_value_ssts,
139 wait_poll_timer: None,
140 })
141}
142
143async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
148 context: CompactorContext,
149 sstable_object_id_manager: SstableObjectIdManagerRef,
150 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
151 mut payload: Vec<ImmutableMemtable>,
152) -> HummockResult<Vec<LocalSstableInfo>> {
153 if !IS_NEW_VALUE {
154 assert!(payload.iter().all(|imm| imm.has_old_value()));
155 }
156 let existing_table_ids: HashSet<u32> = payload
158 .iter()
159 .map(|imm| imm.table_id.table_id)
160 .dedup()
161 .collect();
162 assert!(!existing_table_ids.is_empty());
163
164 let compaction_catalog_agent_ref = compaction_catalog_manager_ref
165 .acquire(existing_table_ids.iter().copied().collect())
166 .await?;
167 let existing_table_ids = compaction_catalog_agent_ref
168 .table_ids()
169 .collect::<HashSet<_>>();
170 payload.retain(|imm| {
171 let ret = existing_table_ids.contains(&imm.table_id.table_id);
172 if !ret {
173 error!(
174 "can not find table {:?}, it may be removed by meta-service",
175 imm.table_id
176 );
177 }
178 ret
179 });
180
181 let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
182 let (splits, sub_compaction_sstable_size, table_vnode_partition) =
183 generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
184 let parallelism = splits.len();
185 let mut compact_success = true;
186 let mut output_ssts = Vec::with_capacity(parallelism);
187 let mut compaction_futures = vec![];
188 let use_block_based_filter = BlockedXor16FilterBuilder::is_kv_count_too_large(total_key_count);
189
190 for (split_index, key_range) in splits.into_iter().enumerate() {
191 let compactor = SharedBufferCompactRunner::new(
192 split_index,
193 key_range,
194 context.clone(),
195 sub_compaction_sstable_size as usize,
196 table_vnode_partition.clone(),
197 use_block_based_filter,
198 Box::new(sstable_object_id_manager.clone()),
199 );
200 let mut forward_iters = Vec::with_capacity(payload.len());
201 for imm in &payload {
202 forward_iters.push(imm.clone().into_directed_iter::<Forward, IS_NEW_VALUE>());
203 }
204 let compaction_executor = context.compaction_executor.clone();
205 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
206 let handle = compaction_executor.spawn({
207 static NEXT_SHARED_BUFFER_COMPACT_ID: LazyLock<AtomicUsize> =
208 LazyLock::new(|| AtomicUsize::new(0));
209 let tree_root = context.await_tree_reg.as_ref().map(|reg| {
210 let id = NEXT_SHARED_BUFFER_COMPACT_ID.fetch_add(1, Relaxed);
211 reg.register(
212 await_tree_key::CompactSharedBuffer { id },
213 format!(
214 "Compact Shared Buffer: {:?}",
215 payload
216 .iter()
217 .flat_map(|imm| imm.epochs().iter())
218 .copied()
219 .collect::<BTreeSet<_>>()
220 ),
221 )
222 });
223 let future = compactor.run(
224 MergeIterator::new(forward_iters),
225 compaction_catalog_agent_ref,
226 );
227 if let Some(root) = tree_root {
228 root.instrument(future).left_future()
229 } else {
230 future.right_future()
231 }
232 });
233 compaction_futures.push(handle);
234 }
235
236 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
237 let mut err = None;
238 while let Some(future_result) = buffered.next().await {
239 match future_result {
240 Ok(Ok((split_index, ssts, table_stats_map))) => {
241 output_ssts.push((split_index, ssts, table_stats_map));
242 }
243 Ok(Err(e)) => {
244 compact_success = false;
245 tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error");
246 err = Some(e);
247 }
248 Err(e) => {
249 compact_success = false;
250 tracing::warn!(
251 error = %e.as_report(),
252 "Shared Buffer Compaction failed with future error",
253 );
254 err = Some(HummockError::compaction_executor(
255 "failed while execute in tokio",
256 ));
257 }
258 }
259 }
260
261 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
263
264 if compact_success {
265 let mut level0 = Vec::with_capacity(parallelism);
266 let mut sst_infos = vec![];
267 for (_, ssts, _) in output_ssts {
268 for sst_info in &ssts {
269 context
270 .compactor_metrics
271 .write_build_l0_bytes
272 .inc_by(sst_info.file_size());
273
274 sst_infos.push(sst_info.sst_info.clone());
275 }
276 level0.extend(ssts);
277 }
278 if context.storage_opts.check_compaction_result {
279 let compaction_executor = context.compaction_executor.clone();
280 let mut forward_iters = Vec::with_capacity(payload.len());
281 for imm in &payload {
282 if !existing_table_ids.contains(&imm.table_id.table_id) {
283 continue;
284 }
285 forward_iters.push(imm.clone().into_forward_iter());
286 }
287 let iter = MergeIterator::new(forward_iters);
288 let left_iter = UserIterator::new(
289 iter,
290 (Bound::Unbounded, Bound::Unbounded),
291 u64::MAX,
292 0,
293 None,
294 );
295 compaction_executor.spawn(async move {
296 match check_flush_result(
297 left_iter,
298 Vec::from_iter(existing_table_ids.iter().cloned()),
299 sst_infos,
300 context,
301 )
302 .await
303 {
304 Err(e) => {
305 tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable");
306 }
307 Ok(true) => (),
308 Ok(false) => {
309 panic!(
310 "failed to check flush result consistency of state-table {:?}",
311 existing_table_ids
312 );
313 }
314 }
315 });
316 }
317 Ok(level0)
318 } else {
319 Err(err.unwrap())
320 }
321}
322
323pub async fn merge_imms_in_memory(
325 table_id: TableId,
326 imms: Vec<ImmutableMemtable>,
327 memory_tracker: Option<MemoryTracker>,
328) -> ImmutableMemtable {
329 let mut epochs = vec![];
330 let mut merged_size = 0;
331 assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
332 let max_imm_id = imms[0].batch_id();
333
334 let has_old_value = imms[0].has_old_value();
335 assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));
338
339 let (old_value_size, global_old_value_size) = if has_old_value {
340 (
341 imms.iter()
342 .map(|imm| imm.old_values().expect("has old value").size)
343 .sum(),
344 Some(
345 imms[0]
346 .old_values()
347 .expect("has old value")
348 .global_old_value_size
349 .clone(),
350 ),
351 )
352 } else {
353 (0, None)
354 };
355
356 let mut imm_iters = Vec::with_capacity(imms.len());
357 let key_count = imms.iter().map(|imm| imm.key_count()).sum();
358 let value_count = imms.iter().map(|imm| imm.value_count()).sum();
359 for imm in imms {
360 assert!(imm.key_count() > 0, "imm should not be empty");
361 assert_eq!(
362 table_id,
363 imm.table_id(),
364 "should only merge data belonging to the same table"
365 );
366
367 epochs.push(imm.min_epoch());
368 merged_size += imm.size();
369
370 imm_iters.push(imm.into_forward_iter());
371 }
372 epochs.sort();
373
374 let mut mi = MergeIterator::new(imm_iters);
376 mi.rewind_no_await();
377 assert!(mi.is_valid());
378
379 let first_item_key = mi.current_key_entry().key.clone();
380
381 let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
382 let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);
383 let mut old_values: Option<Vec<Bytes>> = if has_old_value {
384 Some(Vec::with_capacity(value_count))
385 } else {
386 None
387 };
388
389 merged_entries.push(SharedBufferKeyEntry {
390 key: first_item_key.clone(),
391 value_offset: 0,
392 });
393
394 let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
396 table_id,
397 first_item_key,
398 EpochWithGap::new_max_epoch(),
399 ));
400
401 while mi.is_valid() {
402 let key_entry = mi.current_key_entry();
403 let user_key = UserKey {
404 table_id,
405 table_key: key_entry.key.clone(),
406 };
407 if full_key_tracker.observe_multi_version(
408 user_key,
409 key_entry
410 .new_values
411 .iter()
412 .map(|(epoch_with_gap, _)| *epoch_with_gap),
413 ) {
414 let last_entry = merged_entries.last_mut().expect("non-empty");
415 if last_entry.value_offset == values.len() {
416 warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
417 last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
418 } else {
419 merged_entries.push(SharedBufferKeyEntry {
421 key: full_key_tracker.latest_user_key().table_key.clone(),
422 value_offset: values.len(),
423 });
424 }
425 }
426 values.extend(
427 key_entry
428 .new_values
429 .iter()
430 .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
431 );
432 if let Some(old_values) = &mut old_values {
433 old_values.extend(key_entry.old_values.expect("should exist").iter().cloned())
434 }
435 mi.advance_peek_to_next_key();
436 tokio::task::consume_budget().await;
439 }
440
441 let old_values = old_values.map(|old_values| {
442 SharedBufferBatchOldValues::new(
443 old_values,
444 old_value_size,
445 global_old_value_size.expect("should exist when has old value"),
446 )
447 });
448
449 SharedBufferBatch {
450 inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
451 epochs,
452 merged_entries,
453 values,
454 old_values,
455 merged_size,
456 max_imm_id,
457 memory_tracker,
458 )),
459 table_id,
460 }
461}
462
463fn generate_splits(
465 payload: &Vec<ImmutableMemtable>,
466 existing_table_ids: &HashSet<u32>,
467 storage_opts: &StorageOpts,
468) -> (Vec<KeyRange>, u64, BTreeMap<u32, u32>) {
469 let mut size_and_start_user_keys = vec![];
470 let mut compact_data_size = 0;
471 let mut table_size_infos: HashMap<u32, u64> = HashMap::default();
472 let mut table_vnode_partition = BTreeMap::default();
473 for imm in payload {
474 let data_size = {
475 (imm.value_count() * EPOCH_LEN + imm.size()) as u64
477 };
478 compact_data_size += data_size;
479 size_and_start_user_keys.push((data_size, imm.start_user_key()));
480 let v = table_size_infos.entry(imm.table_id.table_id).or_insert(0);
481 *v += data_size;
482 }
483 size_and_start_user_keys.sort_by(|a, b| a.1.cmp(&b.1));
484 let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
485 splits.push(KeyRange::new(Bytes::new(), Bytes::new()));
486 let mut key_split_append = |key_before_last: &Bytes| {
487 splits.last_mut().unwrap().right = key_before_last.clone();
488 splits.push(KeyRange::new(key_before_last.clone(), Bytes::new()));
489 };
490 let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
491 let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
492 let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
493 let parallelism = std::cmp::min(
494 storage_opts.share_buffers_sync_parallelism as u64,
495 size_and_start_user_keys.len() as u64,
496 );
497 let sub_compaction_data_size = if compact_data_size > parallel_compact_size && parallelism > 1 {
498 compact_data_size / parallelism
499 } else {
500 compact_data_size
501 };
502
503 if existing_table_ids.len() > 1 {
504 if parallelism > 1 && compact_data_size > sstable_size {
505 let mut last_buffer_size = 0;
506 let mut last_user_key: UserKey<Vec<u8>> = UserKey::default();
507 for (data_size, user_key) in size_and_start_user_keys {
508 if last_buffer_size >= sub_compaction_data_size
509 && last_user_key.as_ref() != user_key
510 {
511 last_user_key.set(user_key);
512 key_split_append(
513 &FullKey {
514 user_key,
515 epoch_with_gap: EpochWithGap::new_max_epoch(),
516 }
517 .encode()
518 .into(),
519 );
520 last_buffer_size = data_size;
521 } else {
522 last_user_key.set(user_key);
523 last_buffer_size += data_size;
524 }
525 }
526 }
527
528 for table_id in existing_table_ids {
531 if let Some(table_size) = table_size_infos.get(table_id)
532 && *table_size > min_sstable_size
533 {
534 table_vnode_partition.insert(*table_id, 1);
535 }
536 }
537 }
538
539 let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
542 (splits, sub_compaction_sstable_size, table_vnode_partition)
543}
544
545pub struct SharedBufferCompactRunner {
546 compactor: Compactor,
547 split_index: usize,
548}
549
550impl SharedBufferCompactRunner {
551 pub fn new(
552 split_index: usize,
553 key_range: KeyRange,
554 context: CompactorContext,
555 sub_compaction_sstable_size: usize,
556 table_vnode_partition: BTreeMap<u32, u32>,
557 use_block_based_filter: bool,
558 object_id_getter: Box<dyn GetObjectId>,
559 ) -> Self {
560 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
561 options.capacity = sub_compaction_sstable_size;
562 let compactor = Compactor::new(
563 context,
564 options,
565 super::TaskConfig {
566 key_range,
567 cache_policy: CachePolicy::Fill(CacheHint::Normal),
568 gc_delete_keys: GC_DELETE_KEYS_FOR_FLUSH,
569 retain_multiple_version: true,
570 stats_target_table_ids: None,
571 task_type: compact_task::TaskType::SharedBuffer,
572 table_vnode_partition,
573 use_block_based_filter,
574 table_schemas: Default::default(),
575 disable_drop_column_optimization: false,
576 },
577 object_id_getter,
578 );
579 Self {
580 compactor,
581 split_index,
582 }
583 }
584
585 pub async fn run(
586 self,
587 iter: impl HummockIterator<Direction = Forward>,
588 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
589 ) -> HummockResult<CompactOutput> {
590 let dummy_compaction_filter = DummyCompactionFilter {};
591 let (ssts, table_stats_map) = self
592 .compactor
593 .compact_key_range(
594 iter,
595 dummy_compaction_filter,
596 compaction_catalog_agent_ref,
597 None,
598 None,
599 None,
600 )
601 .await?;
602 Ok((self.split_index, ssts, table_stats_map))
603 }
604}
605
606#[cfg(test)]
607mod tests {
608 use std::collections::HashSet;
609
610 use bytes::Bytes;
611 use risingwave_common::catalog::TableId;
612 use risingwave_common::hash::VirtualNode;
613 use risingwave_common::util::epoch::test_epoch;
614 use risingwave_hummock_sdk::key::{TableKey, prefix_slice_with_vnode};
615
616 use crate::hummock::compactor::shared_buffer_compact::generate_splits;
617 use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
618 use crate::mem_table::ImmutableMemtable;
619 use crate::opts::StorageOpts;
620
621 fn generate_key(key: &str) -> TableKey<Bytes> {
622 TableKey(prefix_slice_with_vnode(
623 VirtualNode::from_index(1),
624 key.as_bytes(),
625 ))
626 }
627
628 #[tokio::test]
629 async fn test_generate_splits_in_order() {
630 let imm1 = ImmutableMemtable::build_shared_buffer_batch_for_test(
631 test_epoch(3),
632 0,
633 vec![(
634 generate_key("dddd"),
635 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
636 )],
637 1024 * 1024,
638 TableId::new(1),
639 );
640 let imm2 = ImmutableMemtable::build_shared_buffer_batch_for_test(
641 test_epoch(3),
642 0,
643 vec![(
644 generate_key("abb"),
645 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
646 )],
647 (1024 + 256) * 1024,
648 TableId::new(1),
649 );
650
651 let imm3 = ImmutableMemtable::build_shared_buffer_batch_for_test(
652 test_epoch(2),
653 0,
654 vec![(
655 generate_key("abc"),
656 SharedBufferValue::Insert(Bytes::from_static(b"v2")),
657 )],
658 (1024 + 512) * 1024,
659 TableId::new(1),
660 );
661 let imm4 = ImmutableMemtable::build_shared_buffer_batch_for_test(
662 test_epoch(3),
663 0,
664 vec![(
665 generate_key("aaa"),
666 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
667 )],
668 (1024 + 512) * 1024,
669 TableId::new(1),
670 );
671
672 let imm5 = ImmutableMemtable::build_shared_buffer_batch_for_test(
673 test_epoch(3),
674 0,
675 vec![(
676 generate_key("aaa"),
677 SharedBufferValue::Insert(Bytes::from_static(b"v3")),
678 )],
679 (1024 + 256) * 1024,
680 TableId::new(2),
681 );
682
683 let storage_opts = StorageOpts {
684 share_buffers_sync_parallelism: 3,
685 parallel_compact_size_mb: 1,
686 sstable_size_mb: 1,
687 ..Default::default()
688 };
689 let payload = vec![imm1, imm2, imm3, imm4, imm5];
690 let (splits, _sstable_capacity, vnodes) =
691 generate_splits(&payload, &HashSet::from_iter([1, 2]), &storage_opts);
692 assert_eq!(
693 splits.len(),
694 storage_opts.share_buffers_sync_parallelism as usize
695 );
696 assert!(vnodes.is_empty());
697 for i in 1..splits.len() {
698 assert_eq!(splits[i].left, splits[i - 1].right);
699 assert!(splits[i].left > splits[i - 1].left);
700 assert!(splits[i].right.is_empty() || splits[i].left < splits[i].right);
701 }
702 }
703}