1mod spiller;
16mod task_manager;
17pub(crate) mod test_utils;
18
19use std::cmp::Ordering;
20use std::collections::btree_map::Entry;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
22use std::fmt::{Debug, Display, Formatter};
23use std::future::{Future, poll_fn};
24use std::mem::{replace, swap, take};
25use std::sync::Arc;
26use std::task::{Context, Poll, ready};
27
28use futures::FutureExt;
29use itertools::Itertools;
30use more_asserts::assert_gt;
31use prometheus::{HistogramTimer, IntGauge};
32use risingwave_common::bitmap::BitmapBuilder;
33use risingwave_common::catalog::TableId;
34use risingwave_common::metrics::UintGauge;
35use risingwave_common::must_match;
36use risingwave_hummock_sdk::table_watermark::{
37 TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId, LocalSstableInfo};
40use task_manager::{TaskManager, UploadingTaskStatus};
41use thiserror_ext::AsReport;
42use tokio::sync::oneshot;
43use tokio::task::JoinHandle;
44use tracing::{debug, error, info, warn};
45
46use crate::hummock::event_handler::LocalInstanceId;
47use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, send_sync_result};
48use crate::hummock::event_handler::uploader::spiller::Spiller;
49use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm;
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchId;
52use crate::hummock::store::version::StagingSstableInfo;
53use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
54use crate::mem_table::ImmId;
55use crate::monitor::HummockStateStoreMetrics;
56use crate::store::SealCurrentEpochOptions;
57
58fn take_before_epoch<T>(
60 data: &mut BTreeMap<HummockEpoch, T>,
61 epoch: HummockEpoch,
62) -> BTreeMap<HummockEpoch, T> {
63 let mut before_epoch_data = data.split_off(&(epoch + 1));
64 swap(&mut before_epoch_data, data);
65 before_epoch_data
66}
67
68type UploadTaskInput = HashMap<LocalInstanceId, Vec<UploaderImm>>;
69pub type UploadTaskPayload = HashMap<LocalInstanceId, Vec<ImmutableMemtable>>;
70
71#[derive(Debug)]
72pub struct UploadTaskOutput {
73 pub new_value_ssts: Vec<LocalSstableInfo>,
74 pub old_value_ssts: Vec<LocalSstableInfo>,
75 pub wait_poll_timer: Option<HistogramTimer>,
76}
77pub type SpawnUploadTask = Arc<
78 dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle<HummockResult<UploadTaskOutput>>
79 + Send
80 + Sync
81 + 'static,
82>;
83
84#[derive(Clone)]
85pub struct UploadTaskInfo {
86 pub task_size: usize,
87 pub epochs: Vec<HummockEpoch>,
88 pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
89}
90
91impl Display for UploadTaskInfo {
92 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct("UploadTaskInfo")
94 .field("task_size", &self.task_size)
95 .field("epochs", &self.epochs)
96 .field("len(imm_ids)", &self.imm_ids.len())
97 .finish()
98 }
99}
100
101impl Debug for UploadTaskInfo {
102 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct("UploadTaskInfo")
104 .field("task_size", &self.task_size)
105 .field("epochs", &self.epochs)
106 .field("imm_ids", &self.imm_ids)
107 .finish()
108 }
109}
110
111mod uploader_imm {
112 use std::fmt::Formatter;
113 use std::ops::Deref;
114
115 use risingwave_common::metrics::UintGauge;
116
117 use crate::hummock::event_handler::uploader::UploaderContext;
118 use crate::mem_table::ImmutableMemtable;
119
120 pub(super) struct UploaderImm {
121 inner: ImmutableMemtable,
122 size_guard: UintGauge,
123 }
124
125 impl UploaderImm {
126 pub(super) fn new(imm: ImmutableMemtable, context: &UploaderContext) -> Self {
127 let size = imm.size();
128 let size_guard = context.stats.uploader_imm_size.clone();
129 size_guard.add(size as _);
130 Self {
131 inner: imm,
132 size_guard,
133 }
134 }
135
136 #[cfg(test)]
137 pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
138 Self {
139 inner: imm,
140 size_guard: UintGauge::new("test", "test").unwrap(),
141 }
142 }
143 }
144
145 impl std::fmt::Debug for UploaderImm {
146 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
147 self.inner.fmt(f)
148 }
149 }
150
151 impl Deref for UploaderImm {
152 type Target = ImmutableMemtable;
153
154 fn deref(&self) -> &Self::Target {
155 &self.inner
156 }
157 }
158
159 impl Drop for UploaderImm {
160 fn drop(&mut self) {
161 self.size_guard.sub(self.inner.size() as _);
162 }
163 }
164}
165
166#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
167struct UploadingTaskId(usize);
168
169struct UploadingTask {
172 task_id: UploadingTaskId,
173 input: UploadTaskInput,
175 join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
176 task_info: UploadTaskInfo,
177 spawn_upload_task: SpawnUploadTask,
178 task_size_guard: UintGauge,
179 task_count_guard: IntGauge,
180}
181
182impl Drop for UploadingTask {
183 fn drop(&mut self) {
184 self.task_size_guard.sub(self.task_info.task_size as u64);
185 self.task_count_guard.dec();
186 }
187}
188
189impl Debug for UploadingTask {
190 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
191 f.debug_struct("UploadingTask")
192 .field("input", &self.input)
193 .field("task_info", &self.task_info)
194 .finish()
195 }
196}
197
198fn get_payload_imm_ids(
199 payload: &UploadTaskPayload,
200) -> HashMap<LocalInstanceId, Vec<SharedBufferBatchId>> {
201 payload
202 .iter()
203 .map(|(instance_id, imms)| {
204 (
205 *instance_id,
206 imms.iter().map(|imm| imm.batch_id()).collect_vec(),
207 )
208 })
209 .collect()
210}
211
212impl UploadingTask {
213 const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);
215
216 fn input_to_payload(input: &UploadTaskInput) -> UploadTaskPayload {
217 input
218 .iter()
219 .map(|(instance_id, imms)| {
220 (
221 *instance_id,
222 imms.iter().map(|imm| (**imm).clone()).collect(),
223 )
224 })
225 .collect()
226 }
227
228 fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
229 assert!(!input.is_empty());
230 let mut epochs = input
231 .iter()
232 .flat_map(|(_, imms)| imms.iter().flat_map(|imm| imm.epochs().iter().cloned()))
233 .sorted()
234 .dedup()
235 .collect_vec();
236
237 epochs.reverse();
239 let payload = Self::input_to_payload(&input);
240 let imm_ids = get_payload_imm_ids(&payload);
241 let task_size = input
242 .values()
243 .map(|imms| imms.iter().map(|imm| imm.size()).sum::<usize>())
244 .sum();
245 let task_info = UploadTaskInfo {
246 task_size,
247 epochs,
248 imm_ids,
249 };
250 context
251 .buffer_tracker
252 .global_upload_task_size()
253 .add(task_size as u64);
254 if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
255 info!("start upload task: {:?}", task_info);
256 } else {
257 debug!("start upload task: {:?}", task_info);
258 }
259 let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
260 context.stats.uploader_uploading_task_count.inc();
261 Self {
262 task_id,
263 input,
264 join_handle,
265 task_info,
266 spawn_upload_task: context.spawn_upload_task.clone(),
267 task_size_guard: context.buffer_tracker.global_upload_task_size().clone(),
268 task_count_guard: context.stats.uploader_uploading_task_count.clone(),
269 }
270 }
271
272 fn poll_result(
274 &mut self,
275 cx: &mut Context<'_>,
276 ) -> Poll<HummockResult<Arc<StagingSstableInfo>>> {
277 Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
278 Ok(task_result) => task_result
279 .inspect(|_| {
280 if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
281 info!(task_info = ?self.task_info, "upload task finish");
282 } else {
283 debug!(task_info = ?self.task_info, "upload task finish");
284 }
285 })
286 .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
287 .map(|output| {
288 Arc::new(StagingSstableInfo::new(
289 output.new_value_ssts,
290 output.old_value_ssts,
291 self.task_info.epochs.clone(),
292 self.task_info.imm_ids.clone(),
293 self.task_info.task_size,
294 ))
295 }),
296
297 Err(err) => Err(HummockError::other(format!(
298 "fail to join upload join handle: {}",
299 err.as_report()
300 ))),
301 })
302 }
303
304 fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll<Arc<StagingSstableInfo>> {
306 loop {
307 let result = ready!(self.poll_result(cx));
308 match result {
309 Ok(sstables) => return Poll::Ready(sstables),
310 Err(e) => {
311 error!(
312 error = %e.as_report(),
313 task_info = ?self.task_info,
314 "a flush task failed, start retry",
315 );
316 self.join_handle = (self.spawn_upload_task)(
317 Self::input_to_payload(&self.input),
318 self.task_info.clone(),
319 );
320 }
324 }
325 }
326 }
327
328 pub fn get_task_info(&self) -> &UploadTaskInfo {
329 &self.task_info
330 }
331}
332
333impl TableUnsyncData {
334 fn add_table_watermarks(
335 &mut self,
336 epoch: HummockEpoch,
337 table_watermarks: Vec<VnodeWatermark>,
338 direction: WatermarkDirection,
339 watermark_type: WatermarkSerdeType,
340 ) {
341 if table_watermarks.is_empty() {
342 return;
343 }
344 let vnode_count = table_watermarks[0].vnode_count();
345 for watermark in &table_watermarks {
346 assert_eq!(vnode_count, watermark.vnode_count());
347 }
348
349 fn apply_new_vnodes(
350 vnode_bitmap: &mut BitmapBuilder,
351 vnode_watermarks: &Vec<VnodeWatermark>,
352 ) {
353 for vnode_watermark in vnode_watermarks {
354 for vnode in vnode_watermark.vnode_bitmap().iter_ones() {
355 assert!(
356 !vnode_bitmap.is_set(vnode),
357 "vnode {} write multiple table watermarks",
358 vnode
359 );
360 vnode_bitmap.set(vnode, true);
361 }
362 }
363 }
364 match &mut self.table_watermarks {
365 Some((prev_direction, prev_watermarks, prev_watermark_type)) => {
366 assert_eq!(
367 *prev_direction, direction,
368 "table id {} new watermark direction not match with previous",
369 self.table_id
370 );
371 assert_eq!(
372 *prev_watermark_type, watermark_type,
373 "table id {} new watermark watermark_type not match with previous",
374 self.table_id
375 );
376 match prev_watermarks.entry(epoch) {
377 Entry::Occupied(mut entry) => {
378 let (prev_watermarks, vnode_bitmap) = entry.get_mut();
379 apply_new_vnodes(vnode_bitmap, &table_watermarks);
380 prev_watermarks.extend(table_watermarks);
381 }
382 Entry::Vacant(entry) => {
383 let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
384 apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
385 entry.insert((table_watermarks, vnode_bitmap));
386 }
387 }
388 }
389 None => {
390 let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
391 apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
392 self.table_watermarks = Some((
393 direction,
394 BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]),
395 watermark_type,
396 ));
397 }
398 }
399 }
400}
401
402impl UploaderData {
403 fn add_table_watermarks(
404 all_table_watermarks: &mut HashMap<TableId, TableWatermarks>,
405 table_id: TableId,
406 direction: WatermarkDirection,
407 watermarks: impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)>,
408 watermark_type: WatermarkSerdeType,
409 ) {
410 let mut table_watermarks: Option<TableWatermarks> = None;
411 for (epoch, watermarks) in watermarks {
412 match &mut table_watermarks {
413 Some(prev_watermarks) => {
414 assert_eq!(prev_watermarks.direction, direction);
415 assert_eq!(prev_watermarks.watermark_type, watermark_type);
416 prev_watermarks.add_new_epoch_watermarks(
417 epoch,
418 Arc::from(watermarks),
419 direction,
420 watermark_type,
421 );
422 }
423 None => {
424 table_watermarks = Some(TableWatermarks::single_epoch(
425 epoch,
426 watermarks,
427 direction,
428 watermark_type,
429 ));
430 }
431 }
432 }
433 if let Some(table_watermarks) = table_watermarks {
434 assert!(
435 all_table_watermarks
436 .insert(table_id, table_watermarks)
437 .is_none()
438 );
439 }
440 }
441}
442
443struct LocalInstanceEpochData {
444 epoch: HummockEpoch,
445 imms: VecDeque<UploaderImm>,
447 has_spilled: bool,
448}
449
450impl LocalInstanceEpochData {
451 fn new(epoch: HummockEpoch) -> Self {
452 Self {
453 epoch,
454 imms: VecDeque::new(),
455 has_spilled: false,
456 }
457 }
458
459 fn epoch(&self) -> HummockEpoch {
460 self.epoch
461 }
462
463 fn add_imm(&mut self, imm: UploaderImm) {
464 assert_eq!(imm.max_epoch(), imm.min_epoch());
465 assert_eq!(self.epoch, imm.min_epoch());
466 if let Some(prev_imm) = self.imms.front() {
467 assert_gt!(imm.batch_id(), prev_imm.batch_id());
468 }
469 self.imms.push_front(imm);
470 }
471
472 fn is_empty(&self) -> bool {
473 self.imms.is_empty()
474 }
475}
476
477struct LocalInstanceUnsyncData {
478 table_id: TableId,
479 instance_id: LocalInstanceId,
480 current_epoch_data: Option<LocalInstanceEpochData>,
482 sealed_data: VecDeque<LocalInstanceEpochData>,
484 flushing_imms: VecDeque<SharedBufferBatchId>,
486 is_destroyed: bool,
487}
488
489impl LocalInstanceUnsyncData {
490 fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self {
491 Self {
492 table_id,
493 instance_id,
494 current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
495 sealed_data: VecDeque::new(),
496 flushing_imms: Default::default(),
497 is_destroyed: false,
498 }
499 }
500
501 fn add_imm(&mut self, imm: UploaderImm) {
502 assert!(!self.is_destroyed);
503 assert_eq!(self.table_id, imm.table_id);
504 self.current_epoch_data
505 .as_mut()
506 .expect("should be Some when adding new imm")
507 .add_imm(imm);
508 }
509
510 fn local_seal_epoch(&mut self, next_epoch: HummockEpoch) -> HummockEpoch {
511 assert!(!self.is_destroyed);
512 let data = self
513 .current_epoch_data
514 .as_mut()
515 .expect("should be Some when seal new epoch");
516 let current_epoch = data.epoch;
517 debug!(
518 instance_id = self.instance_id,
519 next_epoch, current_epoch, "local seal epoch"
520 );
521 assert_gt!(next_epoch, current_epoch);
522 let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch));
523 if !epoch_data.is_empty() {
524 self.sealed_data.push_front(epoch_data);
525 }
526 current_epoch
527 }
528
529 fn ack_flushed(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
531 for imm_id in imm_ids {
532 assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id);
533 }
534 }
535
536 fn spill(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
537 let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() {
538 match oldest_sealed_epoch.epoch.cmp(&epoch) {
539 Ordering::Less => {
540 unreachable!(
541 "should not spill at this epoch because there \
542 is unspilled data in previous epoch: prev epoch {}, spill epoch {}",
543 oldest_sealed_epoch.epoch, epoch
544 );
545 }
546 Ordering::Equal => {
547 let epoch_data = self.sealed_data.pop_back().unwrap();
548 assert_eq!(epoch, epoch_data.epoch);
549 epoch_data.imms
550 }
551 Ordering::Greater => VecDeque::new(),
552 }
553 } else {
554 let Some(current_epoch_data) = &mut self.current_epoch_data else {
555 return Vec::new();
556 };
557 match current_epoch_data.epoch.cmp(&epoch) {
558 Ordering::Less => {
559 assert!(
560 current_epoch_data.imms.is_empty(),
561 "should not spill at this epoch because there \
562 is unspilled data in current epoch epoch {}, spill epoch {}",
563 current_epoch_data.epoch,
564 epoch
565 );
566 VecDeque::new()
567 }
568 Ordering::Equal => {
569 if !current_epoch_data.imms.is_empty() {
570 current_epoch_data.has_spilled = true;
571 take(&mut current_epoch_data.imms)
572 } else {
573 VecDeque::new()
574 }
575 }
576 Ordering::Greater => VecDeque::new(),
577 }
578 };
579 self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
580 imms.into_iter().collect()
581 }
582
583 fn add_flushing_imm(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
584 for imm_id in imm_ids {
585 if let Some(prev_imm_id) = self.flushing_imms.front() {
586 assert_gt!(imm_id, *prev_imm_id);
587 }
588 self.flushing_imms.push_front(imm_id);
589 }
590 }
591
592 fn sync(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
595 let mut ret = Vec::new();
597 while let Some(epoch_data) = self.sealed_data.back()
598 && epoch_data.epoch() <= epoch
599 {
600 let imms = self.sealed_data.pop_back().expect("checked exist").imms;
601 self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
602 ret.extend(imms.into_iter().rev());
603 }
604 ret.reverse();
606 if let Some(latest_epoch_data) = &self.current_epoch_data {
607 if latest_epoch_data.epoch <= epoch {
608 assert!(self.sealed_data.is_empty());
609 assert!(latest_epoch_data.is_empty());
610 assert!(!latest_epoch_data.has_spilled);
611 if cfg!(debug_assertions) {
612 panic!(
613 "sync epoch exceeds latest epoch, and the current instance should have been archived"
614 );
615 }
616 warn!(
617 instance_id = self.instance_id,
618 table_id = self.table_id.table_id,
619 "sync epoch exceeds latest epoch, and the current instance should have be archived"
620 );
621 self.current_epoch_data = None;
622 }
623 }
624 ret
625 }
626
627 fn assert_after_epoch(&self, epoch: HummockEpoch) {
628 if let Some(oldest_sealed_data) = self.sealed_data.back() {
629 assert!(!oldest_sealed_data.imms.is_empty());
630 assert_gt!(oldest_sealed_data.epoch, epoch);
631 } else if let Some(current_data) = &self.current_epoch_data {
632 if current_data.epoch <= epoch {
633 assert!(current_data.imms.is_empty() && !current_data.has_spilled);
634 }
635 }
636 }
637
638 fn is_finished(&self) -> bool {
639 self.is_destroyed && self.sealed_data.is_empty()
640 }
641}
642
643struct TableUnsyncData {
644 table_id: TableId,
645 instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
646 #[expect(clippy::type_complexity)]
647 table_watermarks: Option<(
648 WatermarkDirection,
649 BTreeMap<HummockEpoch, (Vec<VnodeWatermark>, BitmapBuilder)>,
650 WatermarkSerdeType,
651 )>,
652 spill_tasks: BTreeMap<HummockEpoch, VecDeque<UploadingTaskId>>,
653 unsync_epochs: BTreeMap<HummockEpoch, ()>,
654 stopped_next_epoch: Option<HummockEpoch>,
658 syncing_epochs: VecDeque<HummockEpoch>,
660 max_synced_epoch: Option<HummockEpoch>,
661}
662
663impl TableUnsyncData {
664 fn new(table_id: TableId, committed_epoch: Option<HummockEpoch>) -> Self {
665 Self {
666 table_id,
667 instance_data: Default::default(),
668 table_watermarks: None,
669 spill_tasks: Default::default(),
670 unsync_epochs: Default::default(),
671 stopped_next_epoch: None,
672 syncing_epochs: Default::default(),
673 max_synced_epoch: committed_epoch,
674 }
675 }
676
677 fn new_epoch(&mut self, epoch: HummockEpoch) {
678 debug!(table_id = ?self.table_id, epoch, "table new epoch");
679 if let Some(latest_epoch) = self.max_epoch() {
680 assert_gt!(epoch, latest_epoch);
681 }
682 self.unsync_epochs.insert(epoch, ());
683 }
684
685 #[expect(clippy::type_complexity)]
686 fn sync(
687 &mut self,
688 epoch: HummockEpoch,
689 ) -> (
690 impl Iterator<Item = (LocalInstanceId, Vec<UploaderImm>)> + '_,
691 Option<(
692 WatermarkDirection,
693 impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)> + use<>,
694 WatermarkSerdeType,
695 )>,
696 impl Iterator<Item = UploadingTaskId> + use<>,
697 BTreeMap<HummockEpoch, ()>,
698 ) {
699 if let Some(prev_epoch) = self.max_sync_epoch() {
700 assert_gt!(epoch, prev_epoch)
701 }
702 let epochs = take_before_epoch(&mut self.unsync_epochs, epoch);
703 assert_eq!(
704 *epochs.last_key_value().expect("non-empty").0,
705 epoch,
706 "{epochs:?} {epoch} {:?}",
707 self.table_id
708 );
709 self.syncing_epochs.push_front(epoch);
710 (
711 self.instance_data
712 .iter_mut()
713 .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))),
714 self.table_watermarks
715 .as_mut()
716 .map(|(direction, watermarks, watermark_type)| {
717 let watermarks = take_before_epoch(watermarks, epoch)
718 .into_iter()
719 .map(|(epoch, (watermarks, _))| (epoch, watermarks));
720 (*direction, watermarks, *watermark_type)
721 }),
722 take_before_epoch(&mut self.spill_tasks, epoch)
723 .into_values()
724 .flat_map(|tasks| tasks.into_iter()),
725 epochs,
726 )
727 }
728
729 fn ack_synced(&mut self, sync_epoch: HummockEpoch) {
730 let min_sync_epoch = self.syncing_epochs.pop_back().expect("should exist");
731 assert_eq!(sync_epoch, min_sync_epoch);
732 self.max_synced_epoch = Some(sync_epoch);
733 }
734
735 fn ack_committed(&mut self, committed_epoch: HummockEpoch) {
736 let synced_epoch_advanced = {
737 if let Some(max_synced_epoch) = self.max_synced_epoch
738 && max_synced_epoch >= committed_epoch
739 {
740 false
741 } else {
742 true
743 }
744 };
745 if synced_epoch_advanced {
746 self.max_synced_epoch = Some(committed_epoch);
747 if let Some(min_syncing_epoch) = self.syncing_epochs.back() {
748 assert_gt!(*min_syncing_epoch, committed_epoch);
749 }
750 self.assert_after_epoch(committed_epoch);
751 }
752 }
753
754 fn assert_after_epoch(&self, epoch: HummockEpoch) {
755 self.instance_data
756 .values()
757 .for_each(|instance_data| instance_data.assert_after_epoch(epoch));
758 if let Some((_, watermarks, _)) = &self.table_watermarks
759 && let Some((oldest_epoch, _)) = watermarks.first_key_value()
760 {
761 assert_gt!(*oldest_epoch, epoch);
762 }
763 }
764
765 fn max_sync_epoch(&self) -> Option<HummockEpoch> {
766 self.syncing_epochs
767 .front()
768 .cloned()
769 .or(self.max_synced_epoch)
770 }
771
772 fn max_epoch(&self) -> Option<HummockEpoch> {
773 self.unsync_epochs
774 .last_key_value()
775 .map(|(epoch, _)| *epoch)
776 .or_else(|| self.max_sync_epoch())
777 }
778
779 fn is_empty(&self) -> bool {
780 self.instance_data.is_empty()
781 && self.syncing_epochs.is_empty()
782 && self.unsync_epochs.is_empty()
783 }
784}
785
786#[derive(Eq, Hash, PartialEq, Copy, Clone)]
787struct UnsyncEpochId(HummockEpoch, TableId);
788
789impl UnsyncEpochId {
790 fn epoch(&self) -> HummockEpoch {
791 self.0
792 }
793}
794
795fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet<TableId>) -> Option<UnsyncEpochId> {
796 table_ids
797 .iter()
798 .min()
799 .map(|table_id| UnsyncEpochId(epoch, *table_id))
800}
801
802#[derive(Default)]
803struct UnsyncData {
808 table_data: HashMap<TableId, TableUnsyncData>,
809 instance_table_id: HashMap<LocalInstanceId, TableId>,
811 unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
812 spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
813}
814
815impl UnsyncData {
816 fn init_instance(
817 &mut self,
818 table_id: TableId,
819 instance_id: LocalInstanceId,
820 init_epoch: HummockEpoch,
821 ) {
822 debug!(
823 table_id = table_id.table_id,
824 instance_id, init_epoch, "init epoch"
825 );
826 let table_data = self
827 .table_data
828 .get_mut(&table_id)
829 .unwrap_or_else(|| panic!("should exist. {table_id:?}"));
830 assert!(
831 table_data
832 .instance_data
833 .insert(
834 instance_id,
835 LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch)
836 )
837 .is_none()
838 );
839 assert!(
840 self.instance_table_id
841 .insert(instance_id, table_id)
842 .is_none()
843 );
844 assert!(table_data.unsync_epochs.contains_key(&init_epoch));
845 }
846
847 fn instance_data(
848 &mut self,
849 instance_id: LocalInstanceId,
850 ) -> Option<&mut LocalInstanceUnsyncData> {
851 self.instance_table_id
852 .get_mut(&instance_id)
853 .cloned()
854 .map(move |table_id| {
855 self.table_data
856 .get_mut(&table_id)
857 .expect("should exist")
858 .instance_data
859 .get_mut(&instance_id)
860 .expect("should exist")
861 })
862 }
863
864 fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) {
865 self.instance_data(instance_id)
866 .expect("should exist")
867 .add_imm(imm);
868 }
869
870 fn local_seal_epoch(
871 &mut self,
872 instance_id: LocalInstanceId,
873 next_epoch: HummockEpoch,
874 opts: SealCurrentEpochOptions,
875 ) {
876 let table_id = self.instance_table_id[&instance_id];
877 let table_data = self.table_data.get_mut(&table_id).expect("should exist");
878 let instance_data = table_data
879 .instance_data
880 .get_mut(&instance_id)
881 .expect("should exist");
882 let epoch = instance_data.local_seal_epoch(next_epoch);
883 if !table_data.unsync_epochs.contains_key(&next_epoch) {
887 if let Some(stopped_next_epoch) = table_data.stopped_next_epoch {
888 if stopped_next_epoch != next_epoch {
889 let table_id = table_data.table_id.table_id;
890 let unsync_epochs = table_data.unsync_epochs.keys().collect_vec();
891 if cfg!(debug_assertions) {
892 panic!(
893 "table_id {} stop epoch {} different to prev stop epoch {}. unsync epochs: {:?}, syncing epochs {:?}, max_synced_epoch {:?}",
894 table_id,
895 next_epoch,
896 stopped_next_epoch,
897 unsync_epochs,
898 table_data.syncing_epochs,
899 table_data.max_synced_epoch
900 );
901 } else {
902 warn!(
903 table_id,
904 stopped_next_epoch,
905 next_epoch,
906 ?unsync_epochs,
907 syncing_epochs = ?table_data.syncing_epochs,
908 max_synced_epoch = ?table_data.max_synced_epoch,
909 "different stop epoch"
910 );
911 }
912 }
913 } else {
914 if let Some(max_epoch) = table_data.max_epoch() {
915 assert_gt!(next_epoch, max_epoch);
916 }
917 debug!(?table_id, epoch, next_epoch, "table data has stopped");
918 table_data.stopped_next_epoch = Some(next_epoch);
919 }
920 }
921 if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks {
922 table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type);
923 }
924 }
925
926 fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
927 if let Some(table_id) = self.instance_table_id.get(&instance_id) {
928 debug!(instance_id, "destroy instance");
929 let table_data = self.table_data.get_mut(table_id).expect("should exist");
930 let instance_data = table_data
931 .instance_data
932 .get_mut(&instance_id)
933 .expect("should exist");
934 assert!(
935 !instance_data.is_destroyed,
936 "cannot destroy an instance for twice"
937 );
938 instance_data.is_destroyed = true;
939 }
940 }
941
942 fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
943 for table_id in table_ids {
944 if let Some(table_unsync_data) = self.table_data.remove(table_id) {
945 for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
946 if let Some(task_status) = task_manager.abort_task(task_id) {
947 must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
948 assert!(spill_table_ids.is_subset(table_ids));
949 });
950 }
951 if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
952 assert!(spill_table_ids.is_subset(table_ids));
953 }
954 }
955 assert!(
956 table_unsync_data
957 .instance_data
958 .values()
959 .all(|instance| instance.is_destroyed),
960 "should be clear when dropping the read version instance"
961 );
962 for instance_id in table_unsync_data.instance_data.keys() {
963 assert_eq!(
964 *table_id,
965 self.instance_table_id
966 .remove(instance_id)
967 .expect("should exist")
968 );
969 }
970 }
971 }
972 debug_assert!(
973 self.spilled_data
974 .values()
975 .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))
976 );
977 self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
978 if !unsync_epoch_table_ids.is_disjoint(table_ids) {
979 assert!(unsync_epoch_table_ids.is_subset(table_ids));
980 false
981 } else {
982 true
983 }
984 });
985 assert!(
986 self.instance_table_id
987 .values()
988 .all(|table_id| !table_ids.contains(table_id))
989 );
990 }
991}
992
993impl UploaderData {
994 fn sync(
995 &mut self,
996 context: &UploaderContext,
997 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
998 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
999 ) {
1000 let mut all_table_watermarks = HashMap::new();
1001 let mut uploading_tasks = HashSet::new();
1002 let mut spilled_tasks = BTreeSet::new();
1003 let mut all_table_ids = HashSet::new();
1004
1005 let mut flush_payload = HashMap::new();
1006
1007 for (epoch, table_ids) in &sync_table_epochs {
1008 let epoch = *epoch;
1009 for table_id in table_ids {
1010 assert!(
1011 all_table_ids.insert(*table_id),
1012 "duplicate sync table epoch: {:?} {:?}",
1013 all_table_ids,
1014 sync_table_epochs
1015 );
1016 }
1017 if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) {
1018 let min_table_id_data = self
1019 .unsync_data
1020 .table_data
1021 .get_mut(&min_table_id)
1022 .expect("should exist");
1023 let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch);
1024 for epoch in epochs.keys() {
1025 assert_eq!(
1026 &self
1027 .unsync_data
1028 .unsync_epochs
1029 .remove(&UnsyncEpochId(*epoch, min_table_id))
1030 .expect("should exist"),
1031 table_ids
1032 );
1033 }
1034 for table_id in table_ids {
1035 let table_data = self
1036 .unsync_data
1037 .table_data
1038 .get_mut(table_id)
1039 .expect("should exist");
1040 let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) =
1041 table_data.sync(epoch);
1042 assert_eq!(table_unsync_epochs, epochs);
1043 for (instance_id, payload) in unflushed_payload {
1044 if !payload.is_empty() {
1045 flush_payload.insert(instance_id, payload);
1046 }
1047 }
1048 table_data.instance_data.retain(|instance_id, data| {
1049 if data.is_finished() {
1051 assert_eq!(
1052 self.unsync_data.instance_table_id.remove(instance_id),
1053 Some(*table_id)
1054 );
1055 false
1056 } else {
1057 true
1058 }
1059 });
1060 if let Some((direction, watermarks, watermark_type)) = table_watermarks {
1061 Self::add_table_watermarks(
1062 &mut all_table_watermarks,
1063 *table_id,
1064 direction,
1065 watermarks,
1066 watermark_type,
1067 );
1068 }
1069 for task_id in task_ids {
1070 if self.unsync_data.spilled_data.contains_key(&task_id) {
1071 spilled_tasks.insert(task_id);
1072 } else {
1073 uploading_tasks.insert(task_id);
1074 }
1075 }
1076 }
1077 }
1078 }
1079
1080 let sync_id = {
1081 let sync_id = self.next_sync_id;
1082 self.next_sync_id += 1;
1083 SyncId(sync_id)
1084 };
1085
1086 if let Some(extra_flush_task_id) = self.task_manager.sync(
1087 context,
1088 sync_id,
1089 flush_payload,
1090 uploading_tasks.iter().cloned(),
1091 &all_table_ids,
1092 ) {
1093 uploading_tasks.insert(extra_flush_task_id);
1094 }
1095
1096 let uploaded = spilled_tasks
1098 .iter()
1099 .rev()
1100 .map(|task_id| {
1101 let (sst, spill_table_ids) = self
1102 .unsync_data
1103 .spilled_data
1104 .remove(task_id)
1105 .expect("should exist");
1106 assert!(
1107 spill_table_ids.is_subset(&all_table_ids),
1108 "spilled tabled ids {:?} not a subset of sync table id {:?}",
1109 spill_table_ids,
1110 all_table_ids
1111 );
1112 sst
1113 })
1114 .collect();
1115
1116 self.syncing_data.insert(
1117 sync_id,
1118 SyncingData {
1119 sync_table_epochs,
1120 remaining_uploading_tasks: uploading_tasks,
1121 uploaded,
1122 table_watermarks: all_table_watermarks,
1123 sync_result_sender,
1124 },
1125 );
1126
1127 self.check_upload_task_consistency();
1128 }
1129}
1130
1131impl UnsyncData {
1132 fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
1133 for (instance_id, imm_ids) in sstable_info.imm_ids() {
1134 if let Some(instance_data) = self.instance_data(*instance_id) {
1135 instance_data.ack_flushed(imm_ids.iter().rev().cloned());
1137 }
1138 }
1139 }
1140}
1141
1142struct SyncingData {
1143 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1144 remaining_uploading_tasks: HashSet<UploadingTaskId>,
1145 uploaded: VecDeque<Arc<StagingSstableInfo>>,
1147 table_watermarks: HashMap<TableId, TableWatermarks>,
1148 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1149}
1150
1151#[derive(Debug)]
1152pub struct SyncedData {
1153 pub uploaded_ssts: VecDeque<Arc<StagingSstableInfo>>,
1154 pub table_watermarks: HashMap<TableId, TableWatermarks>,
1155}
1156
1157struct UploaderContext {
1158 pinned_version: PinnedVersion,
1159 spawn_upload_task: SpawnUploadTask,
1161 buffer_tracker: BufferTracker,
1162
1163 stats: Arc<HummockStateStoreMetrics>,
1164}
1165
1166impl UploaderContext {
1167 fn new(
1168 pinned_version: PinnedVersion,
1169 spawn_upload_task: SpawnUploadTask,
1170 buffer_tracker: BufferTracker,
1171 stats: Arc<HummockStateStoreMetrics>,
1172 ) -> Self {
1173 UploaderContext {
1174 pinned_version,
1175 spawn_upload_task,
1176 buffer_tracker,
1177 stats,
1178 }
1179 }
1180}
1181
1182#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
1183struct SyncId(usize);
1184
1185#[derive(Default)]
1186struct UploaderData {
1187 unsync_data: UnsyncData,
1188
1189 syncing_data: BTreeMap<SyncId, SyncingData>,
1190
1191 task_manager: TaskManager,
1192 next_sync_id: usize,
1193}
1194
1195impl UploaderData {
1196 fn abort(self, err: impl Fn() -> HummockError) {
1197 self.task_manager.abort_all_tasks();
1198 for syncing_data in self.syncing_data.into_values() {
1199 send_sync_result(syncing_data.sync_result_sender, Err(err()));
1200 }
1201 }
1202
1203 fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
1204 if table_ids.is_empty() {
1205 return;
1206 }
1207 self.unsync_data
1208 .clear_tables(&table_ids, &mut self.task_manager);
1209 self.syncing_data.retain(|sync_id, syncing_data| {
1210 if syncing_data
1211 .sync_table_epochs
1212 .iter()
1213 .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids))
1214 {
1215 assert!(
1216 syncing_data
1217 .sync_table_epochs
1218 .iter()
1219 .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))
1220 );
1221 for task_id in &syncing_data.remaining_uploading_tasks {
1222 match self
1223 .task_manager
1224 .abort_task(*task_id)
1225 .expect("should exist")
1226 {
1227 UploadingTaskStatus::Spilling(spill_table_ids) => {
1228 assert!(spill_table_ids.is_subset(&table_ids));
1229 }
1230 UploadingTaskStatus::Sync(task_sync_id) => {
1231 assert_eq!(sync_id, &task_sync_id);
1232 }
1233 }
1234 }
1235 false
1236 } else {
1237 true
1238 }
1239 });
1240
1241 self.check_upload_task_consistency();
1242 }
1243
1244 fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1245 self.unsync_data
1246 .spilled_data
1247 .values()
1248 .map(|(s, _)| s)
1249 .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
1250 .filter_map(|s| {
1251 s.sstable_infos()
1252 .iter()
1253 .chain(s.old_value_sstable_infos())
1254 .map(|s| s.sst_info.object_id)
1255 .min()
1256 })
1257 .min()
1258 .map(|object_id| object_id.as_raw())
1259 }
1260}
1261
1262struct ErrState {
1263 failed_sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1264 reason: String,
1265}
1266
1267enum UploaderState {
1268 Working(UploaderData),
1269 Err(ErrState),
1270}
1271
1272pub struct HummockUploader {
1285 state: UploaderState,
1286
1287 context: UploaderContext,
1288}
1289
1290impl HummockUploader {
1291 pub(super) fn new(
1292 state_store_metrics: Arc<HummockStateStoreMetrics>,
1293 pinned_version: PinnedVersion,
1294 spawn_upload_task: SpawnUploadTask,
1295 buffer_tracker: BufferTracker,
1296 ) -> Self {
1297 Self {
1298 state: UploaderState::Working(UploaderData::default()),
1299 context: UploaderContext::new(
1300 pinned_version,
1301 spawn_upload_task,
1302 buffer_tracker,
1303 state_store_metrics,
1304 ),
1305 }
1306 }
1307
1308 pub(super) fn buffer_tracker(&self) -> &BufferTracker {
1309 &self.context.buffer_tracker
1310 }
1311
1312 pub(super) fn hummock_version(&self) -> &PinnedVersion {
1313 &self.context.pinned_version
1314 }
1315
1316 pub(super) fn add_imm(&mut self, instance_id: LocalInstanceId, imm: ImmutableMemtable) {
1317 let UploaderState::Working(data) = &mut self.state else {
1318 return;
1319 };
1320 let imm = UploaderImm::new(imm, &self.context);
1321 data.unsync_data.add_imm(instance_id, imm);
1322 }
1323
1324 pub(super) fn init_instance(
1325 &mut self,
1326 instance_id: LocalInstanceId,
1327 table_id: TableId,
1328 init_epoch: HummockEpoch,
1329 ) {
1330 let UploaderState::Working(data) = &mut self.state else {
1331 return;
1332 };
1333 data.unsync_data
1334 .init_instance(table_id, instance_id, init_epoch);
1335 }
1336
1337 pub(super) fn local_seal_epoch(
1338 &mut self,
1339 instance_id: LocalInstanceId,
1340 next_epoch: HummockEpoch,
1341 opts: SealCurrentEpochOptions,
1342 ) {
1343 let UploaderState::Working(data) = &mut self.state else {
1344 return;
1345 };
1346 data.unsync_data
1347 .local_seal_epoch(instance_id, next_epoch, opts);
1348 }
1349
1350 pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
1351 let UploaderState::Working(data) = &mut self.state else {
1352 return;
1353 };
1354 debug!(epoch, ?table_ids, "start epoch");
1355 for table_id in &table_ids {
1356 let table_data = data
1357 .unsync_data
1358 .table_data
1359 .entry(*table_id)
1360 .or_insert_with(|| {
1361 TableUnsyncData::new(
1362 *table_id,
1363 self.context.pinned_version.table_committed_epoch(*table_id),
1364 )
1365 });
1366 table_data.new_epoch(epoch);
1367 }
1368 if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) {
1369 assert!(
1370 data.unsync_data
1371 .unsync_epochs
1372 .insert(unsync_epoch_id, table_ids)
1373 .is_none()
1374 );
1375 }
1376 }
1377
1378 pub(super) fn start_sync_epoch(
1379 &mut self,
1380 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1381 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1382 ) {
1383 let data = match &mut self.state {
1384 UploaderState::Working(data) => data,
1385 UploaderState::Err(ErrState {
1386 failed_sync_table_epochs,
1387 reason,
1388 }) => {
1389 let result = Err(HummockError::other(format!(
1390 "previous sync epoch {:?} failed due to [{}]",
1391 failed_sync_table_epochs, reason
1392 )));
1393 send_sync_result(sync_result_sender, result);
1394 return;
1395 }
1396 };
1397 debug!(?sync_table_epochs, "start sync epoch");
1398
1399 data.sync(&self.context, sync_result_sender, sync_table_epochs);
1400
1401 data.may_notify_sync_task(&self.context);
1402
1403 self.context
1404 .stats
1405 .uploader_syncing_epoch_count
1406 .set(data.syncing_data.len() as _);
1407 }
1408
1409 pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) {
1410 if let UploaderState::Working(data) = &mut self.state {
1411 for (table_id, info) in pinned_version.state_table_info.info() {
1413 if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) {
1414 table_data.ack_committed(info.committed_epoch);
1415 }
1416 }
1417 }
1418 self.context.pinned_version = pinned_version;
1419 }
1420
1421 pub(crate) fn may_flush(&mut self) -> bool {
1422 let UploaderState::Working(data) = &mut self.state else {
1423 return false;
1424 };
1425 if self.context.buffer_tracker.need_flush() {
1426 let mut spiller = Spiller::new(&mut data.unsync_data);
1427 let mut curr_batch_flush_size = 0;
1428 while self
1430 .context
1431 .buffer_tracker
1432 .need_more_flush(curr_batch_flush_size)
1433 && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload()
1434 {
1435 assert!(!payload.is_empty());
1436 {
1437 let (task_id, task_size, spilled_table_ids) =
1438 data.task_manager
1439 .spill(&self.context, spilled_table_ids, payload);
1440 for table_id in spilled_table_ids {
1441 spiller
1442 .unsync_data()
1443 .table_data
1444 .get_mut(table_id)
1445 .expect("should exist")
1446 .spill_tasks
1447 .entry(epoch)
1448 .or_default()
1449 .push_front(task_id);
1450 }
1451 curr_batch_flush_size += task_size;
1452 }
1453 }
1454 data.check_upload_task_consistency();
1455 curr_batch_flush_size > 0
1456 } else {
1457 false
1458 }
1459 }
1460
1461 pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
1462 if let Some(table_ids) = table_ids {
1463 if let UploaderState::Working(data) = &mut self.state {
1464 data.clear_tables(table_ids);
1465 }
1466 } else {
1467 if let UploaderState::Working(data) = replace(
1468 &mut self.state,
1469 UploaderState::Working(UploaderData::default()),
1470 ) {
1471 data.abort(|| HummockError::other("uploader is reset"));
1472 }
1473
1474 self.context.stats.uploader_syncing_epoch_count.set(0);
1475 }
1476 }
1477
1478 pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
1479 let UploaderState::Working(data) = &mut self.state else {
1480 return;
1481 };
1482 data.unsync_data.may_destroy_instance(instance_id);
1483 }
1484
1485 pub(crate) fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1486 if let UploaderState::Working(ref u) = self.state {
1487 u.min_uncommitted_object_id()
1488 } else {
1489 None
1490 }
1491 }
1492}
1493
1494impl UploaderData {
1495 fn may_notify_sync_task(&mut self, context: &UploaderContext) {
1496 while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
1497 && syncing_data.remaining_uploading_tasks.is_empty()
1498 {
1499 let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty");
1500 let SyncingData {
1501 sync_table_epochs,
1502 remaining_uploading_tasks: _,
1503 uploaded,
1504 table_watermarks,
1505 sync_result_sender,
1506 } = syncing_data;
1507 context
1508 .stats
1509 .uploader_syncing_epoch_count
1510 .set(self.syncing_data.len() as _);
1511
1512 for (sync_epoch, table_ids) in sync_table_epochs {
1513 for table_id in table_ids {
1514 if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) {
1515 table_data.ack_synced(sync_epoch);
1516 if table_data.is_empty() {
1517 self.unsync_data.table_data.remove(&table_id);
1518 }
1519 }
1520 }
1521 }
1522
1523 send_sync_result(
1524 sync_result_sender,
1525 Ok(SyncedData {
1526 uploaded_ssts: uploaded,
1527 table_watermarks,
1528 }),
1529 )
1530 }
1531 }
1532
1533 fn check_upload_task_consistency(&self) {
1534 #[cfg(debug_assertions)]
1535 {
1536 let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new();
1537 for table_data in self.unsync_data.table_data.values() {
1538 for task_id in table_data
1539 .spill_tasks
1540 .iter()
1541 .flat_map(|(_, tasks)| tasks.iter())
1542 {
1543 assert!(
1544 spill_task_table_id_from_data
1545 .entry(*task_id)
1546 .or_default()
1547 .insert(table_data.table_id)
1548 );
1549 }
1550 }
1551 let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self
1552 .syncing_data
1553 .iter()
1554 .filter_map(|(sync_id, data)| {
1555 if data.remaining_uploading_tasks.is_empty() {
1556 None
1557 } else {
1558 Some((*sync_id, data.remaining_uploading_tasks.clone()))
1559 }
1560 })
1561 .collect();
1562
1563 let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1564 for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
1565 spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
1566 }
1567 let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1568 for (task_id, status) in self.task_manager.tasks() {
1569 match status {
1570 UploadingTaskStatus::Spilling(table_ids) => {
1571 assert!(
1572 spill_task_table_id_from_manager
1573 .insert(task_id, table_ids.clone())
1574 .is_none()
1575 );
1576 }
1577 UploadingTaskStatus::Sync(sync_id) => {
1578 assert!(
1579 syncing_task_from_manager
1580 .entry(*sync_id)
1581 .or_default()
1582 .insert(task_id)
1583 );
1584 }
1585 }
1586 }
1587 assert_eq!(
1588 spill_task_table_id_from_data,
1589 spill_task_table_id_from_manager
1590 );
1591 assert_eq!(syncing_task_id_from_data, syncing_task_from_manager);
1592 }
1593 }
1594}
1595
1596impl HummockUploader {
1597 pub(super) fn next_uploaded_sst(
1598 &mut self,
1599 ) -> impl Future<Output = Arc<StagingSstableInfo>> + '_ {
1600 poll_fn(|cx| {
1601 let UploaderState::Working(data) = &mut self.state else {
1602 return Poll::Pending;
1603 };
1604
1605 if let Some((task_id, status, result)) = ready!(data.task_manager.poll_task_result(cx))
1606 {
1607 match result {
1608 Ok(sst) => {
1609 data.unsync_data.ack_flushed(&sst);
1610 match status {
1611 UploadingTaskStatus::Sync(sync_id) => {
1612 let syncing_data =
1613 data.syncing_data.get_mut(&sync_id).expect("should exist");
1614 syncing_data.uploaded.push_front(sst.clone());
1615 assert!(syncing_data.remaining_uploading_tasks.remove(&task_id));
1616 data.may_notify_sync_task(&self.context);
1617 }
1618 UploadingTaskStatus::Spilling(table_ids) => {
1619 data.unsync_data
1620 .spilled_data
1621 .insert(task_id, (sst.clone(), table_ids));
1622 }
1623 }
1624 data.check_upload_task_consistency();
1625 Poll::Ready(sst)
1626 }
1627 Err((sync_id, e)) => {
1628 let syncing_data =
1629 data.syncing_data.remove(&sync_id).expect("should exist");
1630 let failed_epochs = syncing_data.sync_table_epochs.clone();
1631 let data = must_match!(replace(
1632 &mut self.state,
1633 UploaderState::Err(ErrState {
1634 failed_sync_table_epochs: syncing_data.sync_table_epochs,
1635 reason: e.as_report().to_string(),
1636 }),
1637 ), UploaderState::Working(data) => data);
1638
1639 let _ = syncing_data
1640 .sync_result_sender
1641 .send(Err(HummockError::other(format!(
1642 "failed to sync: {:?}",
1643 e.as_report()
1644 ))));
1645
1646 data.abort(|| {
1647 HummockError::other(format!(
1648 "previous epoch {:?} failed to sync",
1649 failed_epochs
1650 ))
1651 });
1652 Poll::Pending
1653 }
1654 }
1655 } else {
1656 Poll::Pending
1657 }
1658 })
1659 }
1660}
1661
1662#[cfg(test)]
1663pub(crate) mod tests {
1664 use std::collections::{HashMap, HashSet};
1665 use std::future::{Future, poll_fn};
1666 use std::ops::Deref;
1667 use std::pin::pin;
1668 use std::sync::Arc;
1669 use std::sync::atomic::AtomicUsize;
1670 use std::sync::atomic::Ordering::SeqCst;
1671 use std::task::Poll;
1672
1673 use futures::FutureExt;
1674 use risingwave_common::catalog::TableId;
1675 use risingwave_common::util::epoch::EpochExt;
1676 use risingwave_hummock_sdk::HummockEpoch;
1677 use tokio::sync::oneshot;
1678
1679 use super::test_utils::*;
1680 use crate::hummock::event_handler::TEST_LOCAL_INSTANCE_ID;
1681 use crate::hummock::event_handler::uploader::{
1682 HummockUploader, SyncedData, UploadingTask, get_payload_imm_ids,
1683 };
1684 use crate::hummock::{HummockError, HummockResult};
1685 use crate::opts::StorageOpts;
1686
1687 impl HummockUploader {
1688 pub(super) fn start_single_epoch_sync(
1689 &mut self,
1690 epoch: HummockEpoch,
1691 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1692 table_ids: HashSet<TableId>,
1693 ) {
1694 self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]);
1695 }
1696 }
1697
1698 #[tokio::test]
1699 pub async fn test_uploading_task_future() {
1700 let uploader_context = test_uploader_context(dummy_success_upload_future);
1701
1702 let imm = gen_imm(INITIAL_EPOCH).await;
1703 let imm_size = imm.size();
1704 let imm_ids = get_imm_ids(vec![&imm]);
1705 let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1706 assert_eq!(imm_size, task.task_info.task_size);
1707 assert_eq!(imm_ids, task.task_info.imm_ids);
1708 assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs);
1709 let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1710 assert_eq!(
1711 output.sstable_infos(),
1712 &dummy_success_upload_output().new_value_ssts
1713 );
1714 assert_eq!(imm_size, output.imm_size());
1715 assert_eq!(&imm_ids, output.imm_ids());
1716 assert_eq!(&vec![INITIAL_EPOCH], output.epochs());
1717
1718 let uploader_context = test_uploader_context(dummy_fail_upload_future);
1719 let imm = gen_imm(INITIAL_EPOCH).await;
1720 let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1721 let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1722 }
1723
1724 #[tokio::test]
1725 pub async fn test_uploading_task_poll_result() {
1726 let uploader_context = test_uploader_context(dummy_success_upload_future);
1727 let mut task =
1728 UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1729 let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1730 assert_eq!(
1731 output.sstable_infos(),
1732 &dummy_success_upload_output().new_value_ssts
1733 );
1734
1735 let uploader_context = test_uploader_context(dummy_fail_upload_future);
1736 let mut task =
1737 UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1738 let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1739 }
1740
1741 #[tokio::test]
1742 async fn test_uploading_task_poll_ok_with_retry() {
1743 let run_count = Arc::new(AtomicUsize::new(0));
1744 let fail_num = 10;
1745 let run_count_clone = run_count.clone();
1746 let uploader_context = test_uploader_context(move |payload, info| {
1747 let run_count = run_count.clone();
1748 async move {
1749 let ret = if run_count.load(SeqCst) < fail_num {
1751 Err(HummockError::other("fail"))
1752 } else {
1753 dummy_success_upload_future(payload, info).await
1754 };
1755 run_count.fetch_add(1, SeqCst);
1756 ret
1757 }
1758 });
1759 let mut task =
1760 UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1761 let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await;
1762 assert_eq!(fail_num + 1, run_count_clone.load(SeqCst));
1763 assert_eq!(
1764 output.sstable_infos(),
1765 &dummy_success_upload_output().new_value_ssts
1766 );
1767 }
1768
1769 #[tokio::test]
1770 async fn test_uploader_basic() {
1771 let mut uploader = test_uploader(dummy_success_upload_future);
1772 let epoch1 = INITIAL_EPOCH.next_epoch();
1773 uploader.start_epochs_for_test([epoch1]);
1774 let imm = gen_imm(epoch1).await;
1775 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1776 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1777 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1778
1779 let (sync_tx, sync_rx) = oneshot::channel();
1780 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1781 assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1782 assert_eq!(1, uploader.data().syncing_data.len());
1783 let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1784 assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1785 assert!(syncing_data.uploaded.is_empty());
1786 assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1787
1788 let staging_sst = uploader.next_uploaded_sst().await;
1789 assert_eq!(&vec![epoch1], staging_sst.epochs());
1790 assert_eq!(
1791 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1792 staging_sst.imm_ids()
1793 );
1794 assert_eq!(
1795 &dummy_success_upload_output().new_value_ssts,
1796 staging_sst.sstable_infos()
1797 );
1798
1799 match sync_rx.await {
1800 Ok(Ok(data)) => {
1801 let SyncedData {
1802 uploaded_ssts,
1803 table_watermarks,
1804 } = data;
1805 assert_eq!(1, uploaded_ssts.len());
1806 let staging_sst = &uploaded_ssts[0];
1807 assert_eq!(&vec![epoch1], staging_sst.epochs());
1808 assert_eq!(
1809 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1810 staging_sst.imm_ids()
1811 );
1812 assert_eq!(
1813 &dummy_success_upload_output().new_value_ssts,
1814 staging_sst.sstable_infos()
1815 );
1816 assert!(table_watermarks.is_empty());
1817 }
1818 _ => unreachable!(),
1819 };
1820 assert_eq!(epoch1, uploader.test_max_synced_epoch());
1821
1822 let new_pinned_version = uploader
1823 .context
1824 .pinned_version
1825 .new_pin_version(test_hummock_version(epoch1))
1826 .unwrap();
1827 uploader.update_pinned_version(new_pinned_version);
1828 assert_eq!(
1829 epoch1,
1830 uploader
1831 .context
1832 .pinned_version
1833 .table_committed_epoch(TEST_TABLE_ID)
1834 .unwrap()
1835 );
1836 }
1837
1838 #[tokio::test]
1839 async fn test_uploader_destroy_instance_before_sync() {
1840 let mut uploader = test_uploader(dummy_success_upload_future);
1841 let epoch1 = INITIAL_EPOCH.next_epoch();
1842 uploader.start_epochs_for_test([epoch1]);
1843 let imm = gen_imm(epoch1).await;
1844 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1845 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1846 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1847 uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);
1848
1849 let (sync_tx, sync_rx) = oneshot::channel();
1850 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1851 assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1852 assert_eq!(1, uploader.data().syncing_data.len());
1853 let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1854 assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1855 assert!(syncing_data.uploaded.is_empty());
1856 assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1857
1858 let staging_sst = uploader.next_uploaded_sst().await;
1859 assert_eq!(&vec![epoch1], staging_sst.epochs());
1860 assert_eq!(
1861 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1862 staging_sst.imm_ids()
1863 );
1864 assert_eq!(
1865 &dummy_success_upload_output().new_value_ssts,
1866 staging_sst.sstable_infos()
1867 );
1868
1869 match sync_rx.await {
1870 Ok(Ok(data)) => {
1871 let SyncedData {
1872 uploaded_ssts,
1873 table_watermarks,
1874 } = data;
1875 assert_eq!(1, uploaded_ssts.len());
1876 let staging_sst = &uploaded_ssts[0];
1877 assert_eq!(&vec![epoch1], staging_sst.epochs());
1878 assert_eq!(
1879 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1880 staging_sst.imm_ids()
1881 );
1882 assert_eq!(
1883 &dummy_success_upload_output().new_value_ssts,
1884 staging_sst.sstable_infos()
1885 );
1886 assert!(table_watermarks.is_empty());
1887 }
1888 _ => unreachable!(),
1889 };
1890 assert!(
1891 !uploader
1892 .data()
1893 .unsync_data
1894 .table_data
1895 .contains_key(&TEST_TABLE_ID)
1896 );
1897 }
1898
1899 #[tokio::test]
1900 async fn test_empty_uploader_sync() {
1901 let mut uploader = test_uploader(dummy_success_upload_future);
1902 let epoch1 = INITIAL_EPOCH.next_epoch();
1903
1904 let (sync_tx, sync_rx) = oneshot::channel();
1905 uploader.start_epochs_for_test([epoch1]);
1906 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1907 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1908 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1909 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
1910
1911 assert_uploader_pending(&mut uploader).await;
1912
1913 match sync_rx.await {
1914 Ok(Ok(data)) => {
1915 assert!(data.uploaded_ssts.is_empty());
1916 }
1917 _ => unreachable!(),
1918 };
1919 assert_eq!(epoch1, uploader.test_max_synced_epoch());
1920 let new_pinned_version = uploader
1921 .context
1922 .pinned_version
1923 .new_pin_version(test_hummock_version(epoch1))
1924 .unwrap();
1925 uploader.update_pinned_version(new_pinned_version);
1926 assert!(uploader.data().syncing_data.is_empty());
1927 assert_eq!(
1928 epoch1,
1929 uploader
1930 .context
1931 .pinned_version
1932 .table_committed_epoch(TEST_TABLE_ID)
1933 .unwrap()
1934 );
1935 }
1936
1937 #[tokio::test]
1938 async fn test_uploader_empty_epoch() {
1939 let mut uploader = test_uploader(dummy_success_upload_future);
1940 let epoch1 = INITIAL_EPOCH.next_epoch();
1941 let epoch2 = epoch1.next_epoch();
1942 uploader.start_epochs_for_test([epoch1, epoch2]);
1943 let imm = gen_imm(epoch2).await;
1944 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1946 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1947 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm);
1948
1949 let (sync_tx, sync_rx) = oneshot::channel();
1950 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1951 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
1952
1953 assert_uploader_pending(&mut uploader).await;
1954
1955 match sync_rx.await {
1956 Ok(Ok(data)) => {
1957 assert!(data.uploaded_ssts.is_empty());
1958 }
1959 _ => unreachable!(),
1960 };
1961 assert_eq!(epoch1, uploader.test_max_synced_epoch());
1962 let new_pinned_version = uploader
1963 .context
1964 .pinned_version
1965 .new_pin_version(test_hummock_version(epoch1))
1966 .unwrap();
1967 uploader.update_pinned_version(new_pinned_version);
1968 assert!(uploader.data().syncing_data.is_empty());
1969 assert_eq!(
1970 epoch1,
1971 uploader
1972 .context
1973 .pinned_version
1974 .table_committed_epoch(TEST_TABLE_ID)
1975 .unwrap()
1976 );
1977 }
1978
1979 #[tokio::test]
1980 async fn test_uploader_poll_empty() {
1981 let mut uploader = test_uploader(dummy_success_upload_future);
1982 let fut = uploader.next_uploaded_sst();
1983 let mut fut = pin!(fut);
1984 assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await);
1985 }
1986
1987 #[tokio::test]
1988 async fn test_uploader_empty_advance_mce() {
1989 let mut uploader = test_uploader(dummy_success_upload_future);
1990 let initial_pinned_version = uploader.context.pinned_version.clone();
1991 let epoch1 = INITIAL_EPOCH.next_epoch();
1992 let epoch2 = epoch1.next_epoch();
1993 let epoch3 = epoch2.next_epoch();
1994 let epoch4 = epoch3.next_epoch();
1995 let epoch5 = epoch4.next_epoch();
1996 let epoch6 = epoch5.next_epoch();
1997 let version1 = initial_pinned_version
1998 .new_pin_version(test_hummock_version(epoch1))
1999 .unwrap();
2000 let version2 = initial_pinned_version
2001 .new_pin_version(test_hummock_version(epoch2))
2002 .unwrap();
2003 let version3 = initial_pinned_version
2004 .new_pin_version(test_hummock_version(epoch3))
2005 .unwrap();
2006 let version4 = initial_pinned_version
2007 .new_pin_version(test_hummock_version(epoch4))
2008 .unwrap();
2009 let version5 = initial_pinned_version
2010 .new_pin_version(test_hummock_version(epoch5))
2011 .unwrap();
2012
2013 uploader.start_epochs_for_test([epoch6]);
2014 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
2015
2016 uploader.update_pinned_version(version1);
2017 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2018 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2019
2020 let imm = gen_imm(epoch6).await;
2021 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
2022 uploader.update_pinned_version(version2);
2023 assert_eq!(epoch2, uploader.test_max_synced_epoch());
2024 assert_eq!(epoch2, uploader.test_max_syncing_epoch());
2025
2026 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6);
2027 uploader.update_pinned_version(version3);
2028 assert_eq!(epoch3, uploader.test_max_synced_epoch());
2029 assert_eq!(epoch3, uploader.test_max_syncing_epoch());
2030
2031 let (sync_tx, sync_rx) = oneshot::channel();
2032 uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2033 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2034 uploader.update_pinned_version(version4);
2035 assert_eq!(epoch4, uploader.test_max_synced_epoch());
2036 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2037
2038 let sst = uploader.next_uploaded_sst().await;
2039 assert_eq!(&get_imm_ids([&imm]), sst.imm_ids());
2040
2041 match sync_rx.await {
2042 Ok(Ok(data)) => {
2043 assert!(data.table_watermarks.is_empty());
2044 assert_eq!(1, data.uploaded_ssts.len());
2045 assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids());
2046 }
2047 _ => unreachable!(),
2048 }
2049
2050 uploader.update_pinned_version(version5);
2051 assert_eq!(epoch6, uploader.test_max_synced_epoch());
2052 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2053 }
2054
2055 #[tokio::test]
2056 async fn test_uploader_finish_in_order() {
2057 let config = StorageOpts {
2058 shared_buffer_capacity_mb: 1024 * 1024,
2059 shared_buffer_flush_ratio: 0.0,
2060 ..Default::default()
2061 };
2062 let (buffer_tracker, mut uploader, new_task_notifier) =
2063 prepare_uploader_order_test(&config, false);
2064
2065 let epoch1 = INITIAL_EPOCH.next_epoch();
2066 let epoch2 = epoch1.next_epoch();
2067 let epoch3 = epoch2.next_epoch();
2068 let epoch4 = epoch3.next_epoch();
2069 uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]);
2070 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2071 let memory_limiter = Some(memory_limiter.deref());
2072
2073 let instance_id1 = 1;
2074 let instance_id2 = 2;
2075
2076 uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2077 uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2078
2079 let imm2 = gen_imm_with_limiter(epoch2, memory_limiter).await;
2081 uploader.add_imm(instance_id2, imm2.clone());
2082 let imm1_1 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2083 uploader.add_imm(instance_id1, imm1_1.clone());
2084 let imm1_2 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2085 uploader.add_imm(instance_id1, imm1_2.clone());
2086
2087 let epoch1_spill_payload12 =
2089 HashMap::from_iter([(instance_id1, vec![imm1_2.clone(), imm1_1.clone()])]);
2090 let epoch2_spill_payload = HashMap::from_iter([(instance_id2, vec![imm2.clone()])]);
2091 let (await_start1, finish_tx1) =
2092 new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload12));
2093 let (await_start2, finish_tx2) =
2094 new_task_notifier(get_payload_imm_ids(&epoch2_spill_payload));
2095 uploader.may_flush();
2096 await_start1.await;
2097 await_start2.await;
2098
2099 assert_uploader_pending(&mut uploader).await;
2100
2101 finish_tx2.send(()).unwrap();
2102 assert_uploader_pending(&mut uploader).await;
2103
2104 finish_tx1.send(()).unwrap();
2105 let sst = uploader.next_uploaded_sst().await;
2106 assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids());
2107 assert_eq!(&vec![epoch1], sst.epochs());
2108
2109 let sst = uploader.next_uploaded_sst().await;
2110 assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids());
2111 assert_eq!(&vec![epoch2], sst.epochs());
2112
2113 let imm1_3 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2114 uploader.add_imm(instance_id1, imm1_3.clone());
2115 let epoch1_spill_payload3 = HashMap::from_iter([(instance_id1, vec![imm1_3.clone()])]);
2116 let (await_start1_3, finish_tx1_3) =
2117 new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload3));
2118 uploader.may_flush();
2119 await_start1_3.await;
2120 let imm1_4 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2121 uploader.add_imm(instance_id1, imm1_4.clone());
2122 let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]);
2123 let (await_start1_4, finish_tx1_4) =
2124 new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload));
2125 uploader.local_seal_epoch_for_test(instance_id1, epoch1);
2126 let (sync_tx1, mut sync_rx1) = oneshot::channel();
2127 uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID]));
2128 await_start1_4.await;
2129
2130 uploader.local_seal_epoch_for_test(instance_id1, epoch2);
2131 uploader.local_seal_epoch_for_test(instance_id2, epoch2);
2132
2133 let imm3_1 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2139 let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]);
2140 uploader.add_imm(instance_id1, imm3_1.clone());
2141 let (await_start3_1, finish_tx3_1) =
2142 new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload1));
2143 uploader.may_flush();
2144 await_start3_1.await;
2145 let imm3_2 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2146 let epoch3_spill_payload2 = HashMap::from_iter([(instance_id2, vec![imm3_2.clone()])]);
2147 uploader.add_imm(instance_id2, imm3_2.clone());
2148 let (await_start3_2, finish_tx3_2) =
2149 new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload2));
2150 uploader.may_flush();
2151 await_start3_2.await;
2152 let imm3_3 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2153 uploader.add_imm(instance_id1, imm3_3.clone());
2154
2155 uploader.local_seal_epoch_for_test(instance_id1, epoch3);
2161 let imm4 = gen_imm_with_limiter(epoch4, memory_limiter).await;
2162 uploader.add_imm(instance_id1, imm4.clone());
2163 assert_uploader_pending(&mut uploader).await;
2164
2165 finish_tx3_1.send(()).unwrap();
2172 assert_uploader_pending(&mut uploader).await;
2173 finish_tx1_4.send(()).unwrap();
2174 assert_uploader_pending(&mut uploader).await;
2175 finish_tx1_3.send(()).unwrap();
2176
2177 let sst = uploader.next_uploaded_sst().await;
2178 assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids());
2179
2180 assert!(poll_fn(|cx| Poll::Ready(sync_rx1.poll_unpin(cx).is_pending())).await);
2181
2182 let sst = uploader.next_uploaded_sst().await;
2183 assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids());
2184
2185 match sync_rx1.await {
2186 Ok(Ok(data)) => {
2187 assert_eq!(3, data.uploaded_ssts.len());
2188 assert_eq!(
2189 &get_payload_imm_ids(&epoch1_sync_payload),
2190 data.uploaded_ssts[0].imm_ids()
2191 );
2192 assert_eq!(
2193 &get_payload_imm_ids(&epoch1_spill_payload3),
2194 data.uploaded_ssts[1].imm_ids()
2195 );
2196 assert_eq!(
2197 &get_payload_imm_ids(&epoch1_spill_payload12),
2198 data.uploaded_ssts[2].imm_ids()
2199 );
2200 }
2201 _ => {
2202 unreachable!()
2203 }
2204 }
2205
2206 let (sync_tx2, sync_rx2) = oneshot::channel();
2214 uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID]));
2215 uploader.local_seal_epoch_for_test(instance_id2, epoch3);
2216 let sst = uploader.next_uploaded_sst().await;
2217 assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids());
2218
2219 match sync_rx2.await {
2220 Ok(Ok(data)) => {
2221 assert_eq!(data.uploaded_ssts.len(), 1);
2222 assert_eq!(
2223 &get_payload_imm_ids(&epoch2_spill_payload),
2224 data.uploaded_ssts[0].imm_ids()
2225 );
2226 }
2227 _ => {
2228 unreachable!("should be sync finish");
2229 }
2230 }
2231 assert_eq!(epoch2, uploader.test_max_synced_epoch());
2232
2233 uploader.local_seal_epoch_for_test(instance_id1, epoch4);
2241 uploader.local_seal_epoch_for_test(instance_id2, epoch4);
2242 let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]);
2243 let (await_start4_with_3_3, finish_tx4_with_3_3) =
2244 new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload));
2245 let (sync_tx4, mut sync_rx4) = oneshot::channel();
2246 uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID]));
2247 await_start4_with_3_3.await;
2248
2249 assert_uploader_pending(&mut uploader).await;
2257
2258 finish_tx3_2.send(()).unwrap();
2259 let sst = uploader.next_uploaded_sst().await;
2260 assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids());
2261
2262 finish_tx4_with_3_3.send(()).unwrap();
2263 assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await);
2264
2265 let sst = uploader.next_uploaded_sst().await;
2266 assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids());
2267
2268 match sync_rx4.await {
2269 Ok(Ok(data)) => {
2270 assert_eq!(3, data.uploaded_ssts.len());
2271 assert_eq!(
2272 &get_payload_imm_ids(&epoch4_sync_payload),
2273 data.uploaded_ssts[0].imm_ids()
2274 );
2275 assert_eq!(
2276 &get_payload_imm_ids(&epoch3_spill_payload2),
2277 data.uploaded_ssts[1].imm_ids()
2278 );
2279 assert_eq!(
2280 &get_payload_imm_ids(&epoch3_spill_payload1),
2281 data.uploaded_ssts[2].imm_ids(),
2282 )
2283 }
2284 _ => {
2285 unreachable!("should be sync finish");
2286 }
2287 }
2288 assert_eq!(epoch4, uploader.test_max_synced_epoch());
2289
2290 }
2298
2299 #[tokio::test]
2300 async fn test_uploader_frequently_flush() {
2301 let config = StorageOpts {
2302 shared_buffer_capacity_mb: 10,
2303 shared_buffer_flush_ratio: 0.8,
2304 shared_buffer_min_batch_flush_size_mb: 1,
2306 ..Default::default()
2307 };
2308 let (buffer_tracker, mut uploader, _new_task_notifier) =
2309 prepare_uploader_order_test(&config, true);
2310
2311 let epoch1 = INITIAL_EPOCH.next_epoch();
2312 let epoch2 = epoch1.next_epoch();
2313 uploader.start_epochs_for_test([epoch1, epoch2]);
2314 let instance_id1 = 1;
2315 let instance_id2 = 2;
2316 let flush_threshold = buffer_tracker.flush_threshold();
2317 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2318
2319 uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2320 uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2321
2322 let mut total_memory = 0;
2324 while total_memory < flush_threshold {
2325 let imm = gen_imm_with_limiter(epoch2, Some(memory_limiter.as_ref())).await;
2326 total_memory += imm.size();
2327 if total_memory > flush_threshold {
2328 break;
2329 }
2330 uploader.add_imm(instance_id2, imm);
2331 }
2332 let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2333 uploader.add_imm(instance_id1, imm);
2334 assert!(uploader.may_flush());
2335
2336 for _ in 0..10 {
2337 let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2338 uploader.add_imm(instance_id1, imm);
2339 assert!(!uploader.may_flush());
2340 }
2341 }
2342}