1mod spiller;
16mod task_manager;
17pub(crate) mod test_utils;
18
19use std::cmp::Ordering;
20use std::collections::btree_map::Entry;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque, hash_map};
22use std::fmt::{Debug, Display, Formatter};
23use std::future::{Future, poll_fn};
24use std::mem::{replace, swap, take};
25use std::sync::Arc;
26use std::task::{Context, Poll, ready};
27
28use futures::FutureExt;
29use itertools::Itertools;
30use more_asserts::assert_gt;
31use prometheus::{HistogramTimer, IntGauge};
32use risingwave_common::bitmap::BitmapBuilder;
33use risingwave_common::catalog::TableId;
34use risingwave_common::metrics::UintGauge;
35use risingwave_common::must_match;
36use risingwave_hummock_sdk::table_watermark::{
37 TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
40use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId, LocalSstableInfo};
41use task_manager::{TaskManager, UploadingTaskStatus};
42use thiserror_ext::AsReport;
43use tokio::sync::oneshot;
44use tokio::task::JoinHandle;
45use tracing::{debug, error, info, warn};
46
47use crate::hummock::event_handler::LocalInstanceId;
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, send_sync_result};
49use crate::hummock::event_handler::uploader::spiller::Spiller;
50use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm;
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchId;
53use crate::hummock::store::version::StagingSstableInfo;
54use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
55use crate::mem_table::ImmId;
56use crate::monitor::HummockStateStoreMetrics;
57use crate::store::SealCurrentEpochOptions;
58
59fn take_before_epoch<T>(
61 data: &mut BTreeMap<HummockEpoch, T>,
62 epoch: HummockEpoch,
63) -> BTreeMap<HummockEpoch, T> {
64 let mut before_epoch_data = data.split_off(&(epoch + 1));
65 swap(&mut before_epoch_data, data);
66 before_epoch_data
67}
68
69type UploadTaskInput = HashMap<LocalInstanceId, Vec<UploaderImm>>;
70pub type UploadTaskPayload = HashMap<LocalInstanceId, Vec<ImmutableMemtable>>;
71
72#[derive(Debug)]
73pub struct UploadTaskOutput {
74 pub new_value_ssts: Vec<LocalSstableInfo>,
75 pub old_value_ssts: Vec<LocalSstableInfo>,
76 pub wait_poll_timer: Option<HistogramTimer>,
77}
78pub type SpawnUploadTask = Arc<
79 dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle<HummockResult<UploadTaskOutput>>
80 + Send
81 + Sync
82 + 'static,
83>;
84
85#[derive(Clone)]
86pub struct UploadTaskInfo {
87 pub task_size: usize,
88 pub epochs: Vec<HummockEpoch>,
89 pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
90}
91
92impl Display for UploadTaskInfo {
93 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94 f.debug_struct("UploadTaskInfo")
95 .field("task_size", &self.task_size)
96 .field("epochs", &self.epochs)
97 .field("len(imm_ids)", &self.imm_ids.len())
98 .finish()
99 }
100}
101
102impl Debug for UploadTaskInfo {
103 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("UploadTaskInfo")
105 .field("task_size", &self.task_size)
106 .field("epochs", &self.epochs)
107 .field("imm_ids", &self.imm_ids)
108 .finish()
109 }
110}
111
112mod uploader_imm {
113 use std::fmt::Formatter;
114 use std::ops::Deref;
115
116 use risingwave_common::metrics::UintGauge;
117
118 use crate::hummock::event_handler::uploader::UploaderContext;
119 use crate::mem_table::ImmutableMemtable;
120
121 pub(super) struct UploaderImm {
122 inner: ImmutableMemtable,
123 size_guard: UintGauge,
124 }
125
126 impl UploaderImm {
127 pub(super) fn new(imm: ImmutableMemtable, context: &UploaderContext) -> Self {
128 let size = imm.size();
129 let size_guard = context.stats.uploader_imm_size.clone();
130 size_guard.add(size as _);
131 Self {
132 inner: imm,
133 size_guard,
134 }
135 }
136
137 #[cfg(test)]
138 pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
139 Self {
140 inner: imm,
141 size_guard: UintGauge::new("test", "test").unwrap(),
142 }
143 }
144 }
145
146 impl std::fmt::Debug for UploaderImm {
147 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
148 self.inner.fmt(f)
149 }
150 }
151
152 impl Deref for UploaderImm {
153 type Target = ImmutableMemtable;
154
155 fn deref(&self) -> &Self::Target {
156 &self.inner
157 }
158 }
159
160 impl Drop for UploaderImm {
161 fn drop(&mut self) {
162 self.size_guard.sub(self.inner.size() as _);
163 }
164 }
165}
166
167#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
168struct UploadingTaskId(usize);
169
170struct UploadingTask {
173 task_id: UploadingTaskId,
174 input: UploadTaskInput,
176 join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
177 task_info: UploadTaskInfo,
178 spawn_upload_task: SpawnUploadTask,
179 task_size_guard: UintGauge,
180 task_count_guard: IntGauge,
181}
182
183impl Drop for UploadingTask {
184 fn drop(&mut self) {
185 self.task_size_guard.sub(self.task_info.task_size as u64);
186 self.task_count_guard.dec();
187 }
188}
189
190impl Debug for UploadingTask {
191 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
192 f.debug_struct("UploadingTask")
193 .field("input", &self.input)
194 .field("task_info", &self.task_info)
195 .finish()
196 }
197}
198
199fn get_payload_imm_ids(
200 payload: &UploadTaskPayload,
201) -> HashMap<LocalInstanceId, Vec<SharedBufferBatchId>> {
202 payload
203 .iter()
204 .map(|(instance_id, imms)| {
205 (
206 *instance_id,
207 imms.iter().map(|imm| imm.batch_id()).collect_vec(),
208 )
209 })
210 .collect()
211}
212
213impl UploadingTask {
214 const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);
216
217 fn input_to_payload(input: &UploadTaskInput) -> UploadTaskPayload {
218 input
219 .iter()
220 .map(|(instance_id, imms)| {
221 (
222 *instance_id,
223 imms.iter().map(|imm| (**imm).clone()).collect(),
224 )
225 })
226 .collect()
227 }
228
229 fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
230 assert!(!input.is_empty());
231 let mut epochs = input
232 .iter()
233 .flat_map(|(_, imms)| imms.iter().flat_map(|imm| imm.epochs().iter().cloned()))
234 .sorted()
235 .dedup()
236 .collect_vec();
237
238 epochs.reverse();
240 let payload = Self::input_to_payload(&input);
241 let imm_ids = get_payload_imm_ids(&payload);
242 let task_size = input
243 .values()
244 .map(|imms| imms.iter().map(|imm| imm.size()).sum::<usize>())
245 .sum();
246 let task_info = UploadTaskInfo {
247 task_size,
248 epochs,
249 imm_ids,
250 };
251 context
252 .buffer_tracker
253 .global_upload_task_size()
254 .add(task_size as u64);
255 if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
256 info!("start upload task: {:?}", task_info);
257 } else {
258 debug!("start upload task: {:?}", task_info);
259 }
260 let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
261 context.stats.uploader_uploading_task_count.inc();
262 Self {
263 task_id,
264 input,
265 join_handle,
266 task_info,
267 spawn_upload_task: context.spawn_upload_task.clone(),
268 task_size_guard: context.buffer_tracker.global_upload_task_size().clone(),
269 task_count_guard: context.stats.uploader_uploading_task_count.clone(),
270 }
271 }
272
273 fn poll_result(
275 &mut self,
276 cx: &mut Context<'_>,
277 ) -> Poll<HummockResult<Arc<StagingSstableInfo>>> {
278 Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
279 Ok(task_result) => task_result
280 .inspect(|_| {
281 if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
282 info!(task_info = ?self.task_info, "upload task finish");
283 } else {
284 debug!(task_info = ?self.task_info, "upload task finish");
285 }
286 })
287 .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
288 .map(|output| {
289 Arc::new(StagingSstableInfo::new(
290 output.new_value_ssts,
291 output.old_value_ssts,
292 self.task_info.epochs.clone(),
293 self.task_info.imm_ids.clone(),
294 self.task_info.task_size,
295 ))
296 }),
297
298 Err(err) => Err(HummockError::other(format!(
299 "fail to join upload join handle: {}",
300 err.as_report()
301 ))),
302 })
303 }
304
305 fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll<Arc<StagingSstableInfo>> {
307 loop {
308 let result = ready!(self.poll_result(cx));
309 match result {
310 Ok(sstables) => return Poll::Ready(sstables),
311 Err(e) => {
312 error!(
313 error = %e.as_report(),
314 task_info = ?self.task_info,
315 "a flush task failed, start retry",
316 );
317 self.join_handle = (self.spawn_upload_task)(
318 Self::input_to_payload(&self.input),
319 self.task_info.clone(),
320 );
321 }
325 }
326 }
327 }
328
329 pub fn get_task_info(&self) -> &UploadTaskInfo {
330 &self.task_info
331 }
332}
333
334impl TableUnsyncData {
335 fn add_table_watermarks(
336 &mut self,
337 epoch: HummockEpoch,
338 table_watermarks: Vec<VnodeWatermark>,
339 direction: WatermarkDirection,
340 watermark_type: WatermarkSerdeType,
341 ) {
342 if table_watermarks.is_empty() {
343 return;
344 }
345 let vnode_count = table_watermarks[0].vnode_count();
346 for watermark in &table_watermarks {
347 assert_eq!(vnode_count, watermark.vnode_count());
348 }
349
350 fn apply_new_vnodes(
351 vnode_bitmap: &mut BitmapBuilder,
352 vnode_watermarks: &Vec<VnodeWatermark>,
353 ) {
354 for vnode_watermark in vnode_watermarks {
355 for vnode in vnode_watermark.vnode_bitmap().iter_ones() {
356 assert!(
357 !vnode_bitmap.is_set(vnode),
358 "vnode {} write multiple table watermarks",
359 vnode
360 );
361 vnode_bitmap.set(vnode, true);
362 }
363 }
364 }
365 match &mut self.table_watermarks {
366 Some((prev_direction, prev_watermarks, prev_watermark_type)) => {
367 assert_eq!(
368 *prev_direction, direction,
369 "table id {} new watermark direction not match with previous",
370 self.table_id
371 );
372 assert_eq!(
373 *prev_watermark_type, watermark_type,
374 "table id {} new watermark watermark_type not match with previous",
375 self.table_id
376 );
377 match prev_watermarks.entry(epoch) {
378 Entry::Occupied(mut entry) => {
379 let (prev_watermarks, vnode_bitmap) = entry.get_mut();
380 apply_new_vnodes(vnode_bitmap, &table_watermarks);
381 prev_watermarks.extend(table_watermarks);
382 }
383 Entry::Vacant(entry) => {
384 let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
385 apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
386 entry.insert((table_watermarks, vnode_bitmap));
387 }
388 }
389 }
390 None => {
391 let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
392 apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
393 self.table_watermarks = Some((
394 direction,
395 BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]),
396 watermark_type,
397 ));
398 }
399 }
400 }
401}
402
403impl UploaderData {
404 fn add_table_watermarks(
405 all_table_watermarks: &mut HashMap<TableId, TableWatermarks>,
406 table_id: TableId,
407 direction: WatermarkDirection,
408 watermarks: impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)>,
409 watermark_type: WatermarkSerdeType,
410 ) {
411 let mut table_watermarks: Option<TableWatermarks> = None;
412 for (epoch, watermarks) in watermarks {
413 match &mut table_watermarks {
414 Some(prev_watermarks) => {
415 assert_eq!(prev_watermarks.direction, direction);
416 assert_eq!(prev_watermarks.watermark_type, watermark_type);
417 prev_watermarks.add_new_epoch_watermarks(
418 epoch,
419 Arc::from(watermarks),
420 direction,
421 watermark_type,
422 );
423 }
424 None => {
425 table_watermarks = Some(TableWatermarks::single_epoch(
426 epoch,
427 watermarks,
428 direction,
429 watermark_type,
430 ));
431 }
432 }
433 }
434 if let Some(table_watermarks) = table_watermarks {
435 assert!(
436 all_table_watermarks
437 .insert(table_id, table_watermarks)
438 .is_none()
439 );
440 }
441 }
442}
443
444struct LocalInstanceEpochData {
445 epoch: HummockEpoch,
446 imms: VecDeque<UploaderImm>,
448 has_spilled: bool,
449}
450
451impl LocalInstanceEpochData {
452 fn new(epoch: HummockEpoch) -> Self {
453 Self {
454 epoch,
455 imms: VecDeque::new(),
456 has_spilled: false,
457 }
458 }
459
460 fn epoch(&self) -> HummockEpoch {
461 self.epoch
462 }
463
464 fn add_imm(&mut self, imm: UploaderImm) {
465 assert_eq!(imm.max_epoch(), imm.min_epoch());
466 assert_eq!(self.epoch, imm.min_epoch());
467 if let Some(prev_imm) = self.imms.front() {
468 assert_gt!(imm.batch_id(), prev_imm.batch_id());
469 }
470 self.imms.push_front(imm);
471 }
472
473 fn is_empty(&self) -> bool {
474 self.imms.is_empty()
475 }
476}
477
478struct LocalInstanceUnsyncData {
479 table_id: TableId,
480 instance_id: LocalInstanceId,
481 current_epoch_data: Option<LocalInstanceEpochData>,
483 sealed_data: VecDeque<LocalInstanceEpochData>,
485 flushing_imms: VecDeque<SharedBufferBatchId>,
487 is_destroyed: bool,
488 is_stopped: bool,
489}
490
491impl LocalInstanceUnsyncData {
492 fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self {
493 Self {
494 table_id,
495 instance_id,
496 current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
497 sealed_data: VecDeque::new(),
498 flushing_imms: Default::default(),
499 is_destroyed: false,
500 is_stopped: false,
501 }
502 }
503
504 fn add_imm(&mut self, imm: UploaderImm) {
505 assert!(!self.is_destroyed);
506 assert!(!self.is_stopped);
507 assert_eq!(self.table_id, imm.table_id);
508 self.current_epoch_data
509 .as_mut()
510 .expect("should be Some when adding new imm")
511 .add_imm(imm);
512 }
513
514 fn local_seal_epoch(&mut self, next_epoch: HummockEpoch, stopped: bool) -> HummockEpoch {
515 let data = self
516 .current_epoch_data
517 .as_mut()
518 .expect("should be Some when seal new epoch");
519 let current_epoch = data.epoch;
520 debug!(
521 instance_id = self.instance_id,
522 next_epoch, current_epoch, stopped, "local seal epoch"
523 );
524 assert_gt!(next_epoch, current_epoch);
525 assert!(!self.is_destroyed);
526 assert!(!self.is_stopped);
527 self.is_stopped = stopped;
528 let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch));
529 if !epoch_data.is_empty() {
530 self.sealed_data.push_front(epoch_data);
531 }
532 current_epoch
533 }
534
535 fn ack_flushed(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
537 for imm_id in imm_ids {
538 assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id);
539 }
540 }
541
542 fn spill(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
543 let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() {
544 match oldest_sealed_epoch.epoch.cmp(&epoch) {
545 Ordering::Less => {
546 unreachable!(
547 "should not spill at this epoch because there \
548 is unspilled data in previous epoch: prev epoch {}, spill epoch {}",
549 oldest_sealed_epoch.epoch, epoch
550 );
551 }
552 Ordering::Equal => {
553 let epoch_data = self.sealed_data.pop_back().unwrap();
554 assert_eq!(epoch, epoch_data.epoch);
555 epoch_data.imms
556 }
557 Ordering::Greater => VecDeque::new(),
558 }
559 } else {
560 let Some(current_epoch_data) = &mut self.current_epoch_data else {
561 return Vec::new();
562 };
563 match current_epoch_data.epoch.cmp(&epoch) {
564 Ordering::Less => {
565 assert!(
566 current_epoch_data.imms.is_empty(),
567 "should not spill at this epoch because there \
568 is unspilled data in current epoch epoch {}, spill epoch {}",
569 current_epoch_data.epoch,
570 epoch
571 );
572 VecDeque::new()
573 }
574 Ordering::Equal => {
575 if !current_epoch_data.imms.is_empty() {
576 current_epoch_data.has_spilled = true;
577 take(&mut current_epoch_data.imms)
578 } else {
579 VecDeque::new()
580 }
581 }
582 Ordering::Greater => VecDeque::new(),
583 }
584 };
585 self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
586 imms.into_iter().collect()
587 }
588
589 fn add_flushing_imm(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
590 for imm_id in imm_ids {
591 if let Some(prev_imm_id) = self.flushing_imms.front() {
592 assert_gt!(imm_id, *prev_imm_id);
593 }
594 self.flushing_imms.push_front(imm_id);
595 }
596 }
597
598 fn sync(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
601 let mut ret = Vec::new();
603 while let Some(epoch_data) = self.sealed_data.back()
604 && epoch_data.epoch() <= epoch
605 {
606 let imms = self.sealed_data.pop_back().expect("checked exist").imms;
607 self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
608 ret.extend(imms.into_iter().rev());
609 }
610 ret.reverse();
612 if let Some(latest_epoch_data) = &self.current_epoch_data
613 && latest_epoch_data.epoch <= epoch
614 {
615 assert!(self.sealed_data.is_empty());
616 assert!(latest_epoch_data.is_empty());
617 assert!(!latest_epoch_data.has_spilled);
618 if cfg!(debug_assertions) {
619 panic!(
620 "sync epoch exceeds latest epoch, and the current instance should have been archived, table_id = {}, latest_epoch_data = {}, epoch = {}",
621 self.table_id,
622 latest_epoch_data.epoch(),
623 epoch
624 );
625 }
626 warn!(
627 instance_id = self.instance_id,
628 table_id = %self.table_id,
629 "sync epoch exceeds latest epoch, and the current instance should have be archived"
630 );
631 self.current_epoch_data = None;
632 }
633 ret
634 }
635
636 fn assert_after_epoch(&self, epoch: HummockEpoch) {
637 if let Some(oldest_sealed_data) = self.sealed_data.back() {
638 assert!(!oldest_sealed_data.imms.is_empty());
639 assert_gt!(oldest_sealed_data.epoch, epoch);
640 } else if let Some(current_data) = &self.current_epoch_data
641 && current_data.epoch <= epoch
642 {
643 assert!(current_data.imms.is_empty() && !current_data.has_spilled);
644 }
645 }
646
647 fn is_finished(&self) -> bool {
648 self.is_destroyed && self.sealed_data.is_empty()
649 }
650}
651
652struct TableUnsyncData {
653 table_id: TableId,
654 instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
655 #[expect(clippy::type_complexity)]
656 table_watermarks: Option<(
657 WatermarkDirection,
658 BTreeMap<HummockEpoch, (Vec<VnodeWatermark>, BitmapBuilder)>,
659 WatermarkSerdeType,
660 )>,
661 spill_tasks: BTreeMap<HummockEpoch, VecDeque<UploadingTaskId>>,
662 unsync_epochs: BTreeMap<HummockEpoch, ()>,
663 syncing_epochs: VecDeque<HummockEpoch>,
665 max_synced_epoch: Option<HummockEpoch>,
666}
667
668impl TableUnsyncData {
669 fn new(table_id: TableId, committed_epoch: Option<HummockEpoch>) -> Self {
670 Self {
671 table_id,
672 instance_data: Default::default(),
673 table_watermarks: None,
674 spill_tasks: Default::default(),
675 unsync_epochs: Default::default(),
676 syncing_epochs: Default::default(),
677 max_synced_epoch: committed_epoch,
678 }
679 }
680
681 fn new_epoch(&mut self, epoch: HummockEpoch) {
682 debug!(table_id = ?self.table_id, epoch, "table new epoch");
683 if let Some(latest_epoch) = self.max_epoch() {
684 assert_gt!(epoch, latest_epoch);
685 }
686 self.unsync_epochs.insert(epoch, ());
687 }
688
689 #[expect(clippy::type_complexity)]
690 fn sync(
691 &mut self,
692 epoch: HummockEpoch,
693 ) -> (
694 impl Iterator<Item = (LocalInstanceId, Vec<UploaderImm>)> + '_,
695 Option<(
696 WatermarkDirection,
697 impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)> + use<>,
698 WatermarkSerdeType,
699 )>,
700 impl Iterator<Item = UploadingTaskId> + use<>,
701 BTreeMap<HummockEpoch, ()>,
702 ) {
703 if let Some(prev_epoch) = self.max_sync_epoch() {
704 assert_gt!(epoch, prev_epoch)
705 }
706 let epochs = take_before_epoch(&mut self.unsync_epochs, epoch);
707 assert_eq!(
708 *epochs.last_key_value().expect("non-empty").0,
709 epoch,
710 "{epochs:?} {epoch} {:?}",
711 self.table_id
712 );
713 self.syncing_epochs.push_front(epoch);
714 (
715 self.instance_data
716 .iter_mut()
717 .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))),
718 self.table_watermarks
719 .as_mut()
720 .map(|(direction, watermarks, watermark_type)| {
721 let watermarks = take_before_epoch(watermarks, epoch)
722 .into_iter()
723 .map(|(epoch, (watermarks, _))| (epoch, watermarks));
724 (*direction, watermarks, *watermark_type)
725 }),
726 take_before_epoch(&mut self.spill_tasks, epoch)
727 .into_values()
728 .flat_map(|tasks| tasks.into_iter()),
729 epochs,
730 )
731 }
732
733 fn ack_synced(&mut self, sync_epoch: HummockEpoch) {
734 let min_sync_epoch = self.syncing_epochs.pop_back().expect("should exist");
735 assert_eq!(sync_epoch, min_sync_epoch);
736 self.max_synced_epoch = Some(sync_epoch);
737 }
738
739 fn ack_committed(&mut self, committed_epoch: HummockEpoch) {
740 let synced_epoch_advanced = {
741 if let Some(max_synced_epoch) = self.max_synced_epoch
742 && max_synced_epoch >= committed_epoch
743 {
744 false
745 } else {
746 true
747 }
748 };
749 if synced_epoch_advanced {
750 self.max_synced_epoch = Some(committed_epoch);
751 if let Some(min_syncing_epoch) = self.syncing_epochs.back() {
752 assert_gt!(*min_syncing_epoch, committed_epoch);
753 }
754 self.assert_after_epoch(committed_epoch);
755 }
756 }
757
758 fn assert_after_epoch(&self, epoch: HummockEpoch) {
759 self.instance_data
760 .values()
761 .for_each(|instance_data| instance_data.assert_after_epoch(epoch));
762 if let Some((_, watermarks, _)) = &self.table_watermarks
763 && let Some((oldest_epoch, _)) = watermarks.first_key_value()
764 {
765 assert_gt!(*oldest_epoch, epoch);
766 }
767 }
768
769 fn max_sync_epoch(&self) -> Option<HummockEpoch> {
770 self.syncing_epochs
771 .front()
772 .cloned()
773 .or(self.max_synced_epoch)
774 }
775
776 fn max_epoch(&self) -> Option<HummockEpoch> {
777 self.unsync_epochs
778 .last_key_value()
779 .map(|(epoch, _)| *epoch)
780 .or_else(|| self.max_sync_epoch())
781 }
782
783 fn is_empty(&self) -> bool {
784 self.instance_data.is_empty()
785 && self.syncing_epochs.is_empty()
786 && self.unsync_epochs.is_empty()
787 }
788}
789
790#[derive(Eq, Hash, PartialEq, Copy, Clone)]
791struct UnsyncEpochId(HummockEpoch, TableId);
792
793impl UnsyncEpochId {
794 fn epoch(&self) -> HummockEpoch {
795 self.0
796 }
797}
798
799fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet<TableId>) -> Option<UnsyncEpochId> {
800 table_ids
801 .iter()
802 .min()
803 .map(|table_id| UnsyncEpochId(epoch, *table_id))
804}
805
806#[derive(Default)]
807struct UnsyncData {
812 table_data: HashMap<TableId, TableUnsyncData>,
813 instance_table_id: HashMap<LocalInstanceId, TableId>,
815 unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
816 spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
817}
818
819impl UnsyncData {
820 fn init_instance(
821 &mut self,
822 table_id: TableId,
823 instance_id: LocalInstanceId,
824 init_epoch: HummockEpoch,
825 ) {
826 debug!(
827 table_id = %table_id,
828 instance_id, init_epoch, "init epoch"
829 );
830 let table_data = self
831 .table_data
832 .get_mut(&table_id)
833 .unwrap_or_else(|| panic!("should exist. {table_id:?}"));
834 assert!(
835 table_data
836 .instance_data
837 .insert(
838 instance_id,
839 LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch)
840 )
841 .is_none()
842 );
843 assert!(
844 self.instance_table_id
845 .insert(instance_id, table_id)
846 .is_none()
847 );
848 assert!(table_data.unsync_epochs.contains_key(&init_epoch));
849 }
850
851 fn instance_data(
852 &mut self,
853 instance_id: LocalInstanceId,
854 ) -> Option<&mut LocalInstanceUnsyncData> {
855 self.instance_table_id
856 .get_mut(&instance_id)
857 .cloned()
858 .map(move |table_id| {
859 self.table_data
860 .get_mut(&table_id)
861 .expect("should exist")
862 .instance_data
863 .get_mut(&instance_id)
864 .expect("should exist")
865 })
866 }
867
868 fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) {
869 self.instance_data(instance_id)
870 .expect("should exist")
871 .add_imm(imm);
872 }
873
874 fn local_seal_epoch(
875 &mut self,
876 instance_id: LocalInstanceId,
877 next_epoch: HummockEpoch,
878 opts: SealCurrentEpochOptions,
879 ) {
880 let table_id = self.instance_table_id[&instance_id];
881 let table_data = self.table_data.get_mut(&table_id).expect("should exist");
882 let instance_data = table_data
883 .instance_data
884 .get_mut(&instance_id)
885 .expect("should exist");
886 let stopped = !table_data.unsync_epochs.contains_key(&next_epoch);
890 let epoch = instance_data.local_seal_epoch(next_epoch, stopped);
891 if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks {
892 table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type);
893 }
894 }
895
896 fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
897 if let Some(table_id) = self.instance_table_id.get(&instance_id) {
898 debug!(instance_id, "destroy instance");
899 let table_data = self.table_data.get_mut(table_id).expect("should exist");
900 let instance_data = table_data
901 .instance_data
902 .get_mut(&instance_id)
903 .expect("should exist");
904 assert!(
905 !instance_data.is_destroyed,
906 "cannot destroy an instance for twice"
907 );
908 instance_data.is_destroyed = true;
909 }
910 }
911
912 fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
913 for table_id in table_ids {
914 if let Some(table_unsync_data) = self.table_data.remove(table_id) {
915 for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
916 if let Some(task_status) = task_manager.abort_task(task_id) {
917 must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
918 assert!(spill_table_ids.is_subset(table_ids));
919 });
920 }
921 if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
922 assert!(spill_table_ids.is_subset(table_ids));
923 }
924 }
925 assert!(
926 table_unsync_data
927 .instance_data
928 .values()
929 .all(|instance| instance.is_destroyed),
930 "should be clear when dropping the read version instance"
931 );
932 for instance_id in table_unsync_data.instance_data.keys() {
933 assert_eq!(
934 *table_id,
935 self.instance_table_id
936 .remove(instance_id)
937 .expect("should exist")
938 );
939 }
940 }
941 }
942 debug_assert!(
943 self.spilled_data
944 .values()
945 .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))
946 );
947 self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
948 if !unsync_epoch_table_ids.is_disjoint(table_ids) {
949 assert!(unsync_epoch_table_ids.is_subset(table_ids));
950 false
951 } else {
952 true
953 }
954 });
955 assert!(
956 self.instance_table_id
957 .values()
958 .all(|table_id| !table_ids.contains(table_id))
959 );
960 }
961}
962
963impl UploaderData {
964 fn sync(
965 &mut self,
966 context: &UploaderContext,
967 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
968 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
969 ) {
970 let mut all_table_watermarks = HashMap::new();
971 let mut uploading_tasks = HashSet::new();
972 let mut spilled_tasks = BTreeSet::new();
973 let mut all_table_ids = HashSet::new();
974 let mut vector_index_adds = HashMap::new();
975
976 let mut flush_payload = HashMap::new();
977
978 for (epoch, table_ids) in &sync_table_epochs {
979 let epoch = *epoch;
980 for table_id in table_ids {
981 assert!(
982 all_table_ids.insert(*table_id),
983 "duplicate sync table epoch: {:?} {:?}",
984 all_table_ids,
985 sync_table_epochs
986 );
987 }
988 if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) {
989 let min_table_id_data = self
990 .unsync_data
991 .table_data
992 .get_mut(&min_table_id)
993 .expect("should exist");
994 let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch);
995 for epoch in epochs.keys() {
996 assert_eq!(
997 &self
998 .unsync_data
999 .unsync_epochs
1000 .remove(&UnsyncEpochId(*epoch, min_table_id))
1001 .expect("should exist"),
1002 table_ids
1003 );
1004 }
1005 for table_id in table_ids {
1006 let table_data = self
1007 .unsync_data
1008 .table_data
1009 .get_mut(table_id)
1010 .expect("should exist");
1011 let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) =
1012 table_data.sync(epoch);
1013 assert_eq!(table_unsync_epochs, epochs);
1014 for (instance_id, payload) in unflushed_payload {
1015 if !payload.is_empty() {
1016 flush_payload.insert(instance_id, payload);
1017 }
1018 }
1019 table_data.instance_data.retain(|instance_id, data| {
1020 if data.is_finished() {
1022 assert_eq!(
1023 self.unsync_data.instance_table_id.remove(instance_id),
1024 Some(*table_id)
1025 );
1026 false
1027 } else {
1028 true
1029 }
1030 });
1031 if let Some((direction, watermarks, watermark_type)) = table_watermarks {
1032 Self::add_table_watermarks(
1033 &mut all_table_watermarks,
1034 *table_id,
1035 direction,
1036 watermarks,
1037 watermark_type,
1038 );
1039 }
1040 for task_id in task_ids {
1041 if self.unsync_data.spilled_data.contains_key(&task_id) {
1042 spilled_tasks.insert(task_id);
1043 } else {
1044 uploading_tasks.insert(task_id);
1045 }
1046 }
1047
1048 if let hash_map::Entry::Occupied(mut entry) =
1049 self.unsync_vector_index_data.entry(*table_id)
1050 {
1051 let data = entry.get_mut();
1052 let adds = take_before_epoch(&mut data.sealed_epoch_data, epoch)
1053 .into_values()
1054 .flatten()
1055 .collect_vec();
1056 if data.is_dropped && data.sealed_epoch_data.is_empty() {
1057 entry.remove();
1058 }
1059 if !adds.is_empty() {
1060 vector_index_adds
1061 .try_insert(*table_id, adds)
1062 .expect("non-duplicate");
1063 }
1064 }
1065 }
1066 }
1067 }
1068
1069 let sync_id = {
1070 let sync_id = self.next_sync_id;
1071 self.next_sync_id += 1;
1072 SyncId(sync_id)
1073 };
1074
1075 if let Some(extra_flush_task_id) = self.task_manager.sync(
1076 context,
1077 sync_id,
1078 flush_payload,
1079 uploading_tasks.iter().cloned(),
1080 &all_table_ids,
1081 ) {
1082 uploading_tasks.insert(extra_flush_task_id);
1083 }
1084
1085 let uploaded = spilled_tasks
1087 .iter()
1088 .rev()
1089 .map(|task_id| {
1090 let (sst, spill_table_ids) = self
1091 .unsync_data
1092 .spilled_data
1093 .remove(task_id)
1094 .expect("should exist");
1095 assert!(
1096 spill_table_ids.is_subset(&all_table_ids),
1097 "spilled tabled ids {:?} not a subset of sync table id {:?}",
1098 spill_table_ids,
1099 all_table_ids
1100 );
1101 sst
1102 })
1103 .collect();
1104
1105 self.syncing_data.insert(
1106 sync_id,
1107 SyncingData {
1108 sync_table_epochs,
1109 remaining_uploading_tasks: uploading_tasks,
1110 uploaded,
1111 table_watermarks: all_table_watermarks,
1112 vector_index_adds,
1113 sync_result_sender,
1114 },
1115 );
1116
1117 self.check_upload_task_consistency();
1118 }
1119}
1120
1121impl UnsyncData {
1122 fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
1123 for (instance_id, imm_ids) in sstable_info.imm_ids() {
1124 if let Some(instance_data) = self.instance_data(*instance_id) {
1125 instance_data.ack_flushed(imm_ids.iter().rev().cloned());
1127 }
1128 }
1129 }
1130}
1131
1132struct SyncingData {
1133 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1134 remaining_uploading_tasks: HashSet<UploadingTaskId>,
1135 uploaded: VecDeque<Arc<StagingSstableInfo>>,
1137 table_watermarks: HashMap<TableId, TableWatermarks>,
1138 vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1139 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1140}
1141
1142#[derive(Debug)]
1143pub struct SyncedData {
1144 pub uploaded_ssts: VecDeque<Arc<StagingSstableInfo>>,
1145 pub table_watermarks: HashMap<TableId, TableWatermarks>,
1146 pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1147}
1148
1149struct UploaderContext {
1150 pinned_version: PinnedVersion,
1151 spawn_upload_task: SpawnUploadTask,
1153 buffer_tracker: BufferTracker,
1154
1155 stats: Arc<HummockStateStoreMetrics>,
1156}
1157
1158impl UploaderContext {
1159 fn new(
1160 pinned_version: PinnedVersion,
1161 spawn_upload_task: SpawnUploadTask,
1162 buffer_tracker: BufferTracker,
1163 stats: Arc<HummockStateStoreMetrics>,
1164 ) -> Self {
1165 UploaderContext {
1166 pinned_version,
1167 spawn_upload_task,
1168 buffer_tracker,
1169 stats,
1170 }
1171 }
1172}
1173
1174#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
1175struct SyncId(usize);
1176
1177struct UnsyncVectorIndexData {
1178 sealed_epoch_data: BTreeMap<HummockEpoch, Option<VectorIndexAdd>>,
1179 curr_epoch: u64,
1180 is_dropped: bool,
1181}
1182
1183#[derive(Default)]
1184struct UploaderData {
1185 unsync_data: UnsyncData,
1186 unsync_vector_index_data: HashMap<TableId, UnsyncVectorIndexData>,
1187
1188 syncing_data: BTreeMap<SyncId, SyncingData>,
1189
1190 task_manager: TaskManager,
1191 next_sync_id: usize,
1192}
1193
1194impl UploaderData {
1195 fn abort(self, err: impl Fn() -> HummockError) {
1196 self.task_manager.abort_all_tasks();
1197 for syncing_data in self.syncing_data.into_values() {
1198 send_sync_result(syncing_data.sync_result_sender, Err(err()));
1199 }
1200 }
1201
1202 fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
1203 if table_ids.is_empty() {
1204 return;
1205 }
1206 self.unsync_data
1207 .clear_tables(&table_ids, &mut self.task_manager);
1208 self.syncing_data.retain(|sync_id, syncing_data| {
1209 if syncing_data
1210 .sync_table_epochs
1211 .iter()
1212 .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids))
1213 {
1214 assert!(
1215 syncing_data
1216 .sync_table_epochs
1217 .iter()
1218 .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))
1219 );
1220 for task_id in &syncing_data.remaining_uploading_tasks {
1221 match self
1222 .task_manager
1223 .abort_task(*task_id)
1224 .expect("should exist")
1225 {
1226 UploadingTaskStatus::Spilling(spill_table_ids) => {
1227 assert!(spill_table_ids.is_subset(&table_ids));
1228 }
1229 UploadingTaskStatus::Sync(task_sync_id) => {
1230 assert_eq!(sync_id, &task_sync_id);
1231 }
1232 }
1233 }
1234 false
1235 } else {
1236 true
1237 }
1238 });
1239
1240 self.check_upload_task_consistency();
1241 }
1242
1243 fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1244 self.unsync_data
1245 .spilled_data
1246 .values()
1247 .map(|(s, _)| s)
1248 .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
1249 .filter_map(|s| {
1250 s.sstable_infos()
1251 .iter()
1252 .chain(s.old_value_sstable_infos())
1253 .map(|s| s.sst_info.object_id)
1254 .min()
1255 })
1256 .min()
1257 .map(|object_id| object_id.as_raw())
1258 }
1259}
1260
1261struct ErrState {
1262 failed_sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1263 reason: String,
1264}
1265
1266enum UploaderState {
1267 Working(UploaderData),
1268 Err(ErrState),
1269}
1270
1271pub struct HummockUploader {
1284 state: UploaderState,
1285
1286 context: UploaderContext,
1287}
1288
1289impl HummockUploader {
1290 pub(super) fn new(
1291 state_store_metrics: Arc<HummockStateStoreMetrics>,
1292 pinned_version: PinnedVersion,
1293 spawn_upload_task: SpawnUploadTask,
1294 buffer_tracker: BufferTracker,
1295 ) -> Self {
1296 Self {
1297 state: UploaderState::Working(UploaderData::default()),
1298 context: UploaderContext::new(
1299 pinned_version,
1300 spawn_upload_task,
1301 buffer_tracker,
1302 state_store_metrics,
1303 ),
1304 }
1305 }
1306
1307 pub(super) fn buffer_tracker(&self) -> &BufferTracker {
1308 &self.context.buffer_tracker
1309 }
1310
1311 pub(super) fn hummock_version(&self) -> &PinnedVersion {
1312 &self.context.pinned_version
1313 }
1314
1315 pub(super) fn add_imms(&mut self, instance_id: LocalInstanceId, imms: Vec<ImmutableMemtable>) {
1316 let UploaderState::Working(data) = &mut self.state else {
1317 return;
1318 };
1319 for imm in imms {
1320 let imm = UploaderImm::new(imm, &self.context);
1321 data.unsync_data.add_imm(instance_id, imm);
1322 }
1323 }
1324
1325 pub(super) fn init_instance(
1326 &mut self,
1327 instance_id: LocalInstanceId,
1328 table_id: TableId,
1329 init_epoch: HummockEpoch,
1330 ) {
1331 let UploaderState::Working(data) = &mut self.state else {
1332 return;
1333 };
1334 data.unsync_data
1335 .init_instance(table_id, instance_id, init_epoch);
1336 }
1337
1338 pub(super) fn local_seal_epoch(
1339 &mut self,
1340 instance_id: LocalInstanceId,
1341 next_epoch: HummockEpoch,
1342 opts: SealCurrentEpochOptions,
1343 ) {
1344 let UploaderState::Working(data) = &mut self.state else {
1345 return;
1346 };
1347 data.unsync_data
1348 .local_seal_epoch(instance_id, next_epoch, opts);
1349 }
1350
1351 pub(super) fn register_vector_writer(&mut self, table_id: TableId, init_epoch: HummockEpoch) {
1352 let UploaderState::Working(data) = &mut self.state else {
1353 return;
1354 };
1355 assert!(
1356 data.unsync_vector_index_data
1357 .try_insert(
1358 table_id,
1359 UnsyncVectorIndexData {
1360 sealed_epoch_data: Default::default(),
1361 curr_epoch: init_epoch,
1362 is_dropped: false,
1363 }
1364 )
1365 .is_ok(),
1366 "duplicate vector writer on {}",
1367 table_id
1368 )
1369 }
1370
1371 pub(super) fn vector_writer_seal_epoch(
1372 &mut self,
1373 table_id: TableId,
1374 next_epoch: HummockEpoch,
1375 add: Option<VectorIndexAdd>,
1376 ) {
1377 let UploaderState::Working(data) = &mut self.state else {
1378 return;
1379 };
1380 let data = data
1381 .unsync_vector_index_data
1382 .get_mut(&table_id)
1383 .expect("should exist");
1384 assert!(!data.is_dropped);
1385 assert!(
1386 data.curr_epoch < next_epoch,
1387 "next epoch {} should be greater than current epoch {}",
1388 next_epoch,
1389 data.curr_epoch
1390 );
1391 data.sealed_epoch_data
1392 .try_insert(data.curr_epoch, add)
1393 .expect("non-duplicate");
1394 data.curr_epoch = next_epoch;
1395 }
1396
1397 pub(super) fn drop_vector_writer(&mut self, table_id: TableId) {
1398 let UploaderState::Working(data) = &mut self.state else {
1399 return;
1400 };
1401 let hash_map::Entry::Occupied(mut entry) = data.unsync_vector_index_data.entry(table_id)
1402 else {
1403 panic!("vector writer {} should exist", table_id);
1404 };
1405 let data = entry.get_mut();
1406 data.is_dropped = true;
1407 if data.sealed_epoch_data.is_empty() {
1408 entry.remove();
1409 }
1410 }
1411
1412 pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
1413 let UploaderState::Working(data) = &mut self.state else {
1414 return;
1415 };
1416 debug!(epoch, ?table_ids, "start epoch");
1417 for table_id in &table_ids {
1418 let table_data = data
1419 .unsync_data
1420 .table_data
1421 .entry(*table_id)
1422 .or_insert_with(|| {
1423 TableUnsyncData::new(
1424 *table_id,
1425 self.context.pinned_version.table_committed_epoch(*table_id),
1426 )
1427 });
1428 table_data.new_epoch(epoch);
1429 }
1430 if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) {
1431 assert!(
1432 data.unsync_data
1433 .unsync_epochs
1434 .insert(unsync_epoch_id, table_ids)
1435 .is_none()
1436 );
1437 }
1438 }
1439
1440 pub(super) fn start_sync_epoch(
1441 &mut self,
1442 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1443 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1444 ) {
1445 let data = match &mut self.state {
1446 UploaderState::Working(data) => data,
1447 UploaderState::Err(ErrState {
1448 failed_sync_table_epochs,
1449 reason,
1450 }) => {
1451 let result = Err(HummockError::other(format!(
1452 "previous sync epoch {:?} failed due to [{}]",
1453 failed_sync_table_epochs, reason
1454 )));
1455 send_sync_result(sync_result_sender, result);
1456 return;
1457 }
1458 };
1459 debug!(?sync_table_epochs, "start sync epoch");
1460
1461 data.sync(&self.context, sync_result_sender, sync_table_epochs);
1462
1463 data.may_notify_sync_task(&self.context);
1464
1465 self.context
1466 .stats
1467 .uploader_syncing_epoch_count
1468 .set(data.syncing_data.len() as _);
1469 }
1470
1471 pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) {
1472 if let UploaderState::Working(data) = &mut self.state {
1473 for (table_id, info) in pinned_version.state_table_info.info() {
1475 if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) {
1476 table_data.ack_committed(info.committed_epoch);
1477 }
1478 }
1479 }
1480 self.context.pinned_version = pinned_version;
1481 }
1482
1483 pub(crate) fn may_flush(&mut self) -> bool {
1484 let UploaderState::Working(data) = &mut self.state else {
1485 return false;
1486 };
1487 if self.context.buffer_tracker.need_flush() {
1488 let mut spiller = Spiller::new(&mut data.unsync_data);
1489 let mut curr_batch_flush_size = 0;
1490 while self
1492 .context
1493 .buffer_tracker
1494 .need_more_flush(curr_batch_flush_size)
1495 && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload()
1496 {
1497 assert!(!payload.is_empty());
1498 {
1499 let (task_id, task_size, spilled_table_ids) =
1500 data.task_manager
1501 .spill(&self.context, spilled_table_ids, payload);
1502 for table_id in spilled_table_ids {
1503 spiller
1504 .unsync_data()
1505 .table_data
1506 .get_mut(table_id)
1507 .expect("should exist")
1508 .spill_tasks
1509 .entry(epoch)
1510 .or_default()
1511 .push_front(task_id);
1512 }
1513 curr_batch_flush_size += task_size;
1514 }
1515 }
1516 data.check_upload_task_consistency();
1517 curr_batch_flush_size > 0
1518 } else {
1519 false
1520 }
1521 }
1522
1523 pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
1524 if let Some(table_ids) = table_ids {
1525 if let UploaderState::Working(data) = &mut self.state {
1526 data.clear_tables(table_ids);
1527 }
1528 } else {
1529 if let UploaderState::Working(data) = replace(
1530 &mut self.state,
1531 UploaderState::Working(UploaderData::default()),
1532 ) {
1533 data.abort(|| HummockError::other("uploader is reset"));
1534 }
1535
1536 self.context.stats.uploader_syncing_epoch_count.set(0);
1537 }
1538 }
1539
1540 pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
1541 let UploaderState::Working(data) = &mut self.state else {
1542 return;
1543 };
1544 data.unsync_data.may_destroy_instance(instance_id);
1545 }
1546
1547 pub(crate) fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1548 if let UploaderState::Working(ref u) = self.state {
1549 u.min_uncommitted_object_id()
1550 } else {
1551 None
1552 }
1553 }
1554}
1555
1556impl UploaderData {
1557 fn may_notify_sync_task(&mut self, context: &UploaderContext) {
1558 while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
1559 && syncing_data.remaining_uploading_tasks.is_empty()
1560 {
1561 let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty");
1562 let SyncingData {
1563 sync_table_epochs,
1564 remaining_uploading_tasks: _,
1565 uploaded,
1566 table_watermarks,
1567 vector_index_adds,
1568 sync_result_sender,
1569 } = syncing_data;
1570 context
1571 .stats
1572 .uploader_syncing_epoch_count
1573 .set(self.syncing_data.len() as _);
1574
1575 for (sync_epoch, table_ids) in sync_table_epochs {
1576 for table_id in table_ids {
1577 if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) {
1578 table_data.ack_synced(sync_epoch);
1579 if table_data.is_empty() {
1580 self.unsync_data.table_data.remove(&table_id);
1581 }
1582 }
1583 }
1584 }
1585
1586 send_sync_result(
1587 sync_result_sender,
1588 Ok(SyncedData {
1589 uploaded_ssts: uploaded,
1590 table_watermarks,
1591 vector_index_adds,
1592 }),
1593 )
1594 }
1595 }
1596
1597 fn check_upload_task_consistency(&self) {
1598 #[cfg(debug_assertions)]
1599 {
1600 let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new();
1601 for table_data in self.unsync_data.table_data.values() {
1602 for task_id in table_data
1603 .spill_tasks
1604 .iter()
1605 .flat_map(|(_, tasks)| tasks.iter())
1606 {
1607 assert!(
1608 spill_task_table_id_from_data
1609 .entry(*task_id)
1610 .or_default()
1611 .insert(table_data.table_id)
1612 );
1613 }
1614 }
1615 let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self
1616 .syncing_data
1617 .iter()
1618 .filter_map(|(sync_id, data)| {
1619 if data.remaining_uploading_tasks.is_empty() {
1620 None
1621 } else {
1622 Some((*sync_id, data.remaining_uploading_tasks.clone()))
1623 }
1624 })
1625 .collect();
1626
1627 let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1628 for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
1629 spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
1630 }
1631 let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1632 for (task_id, status) in self.task_manager.tasks() {
1633 match status {
1634 UploadingTaskStatus::Spilling(table_ids) => {
1635 assert!(
1636 spill_task_table_id_from_manager
1637 .insert(task_id, table_ids.clone())
1638 .is_none()
1639 );
1640 }
1641 UploadingTaskStatus::Sync(sync_id) => {
1642 assert!(
1643 syncing_task_from_manager
1644 .entry(*sync_id)
1645 .or_default()
1646 .insert(task_id)
1647 );
1648 }
1649 }
1650 }
1651 assert_eq!(
1652 spill_task_table_id_from_data,
1653 spill_task_table_id_from_manager
1654 );
1655 assert_eq!(syncing_task_id_from_data, syncing_task_from_manager);
1656 }
1657 }
1658}
1659
1660impl HummockUploader {
1661 pub(super) fn next_uploaded_sst(
1662 &mut self,
1663 ) -> impl Future<Output = Arc<StagingSstableInfo>> + '_ {
1664 poll_fn(|cx| {
1665 let UploaderState::Working(data) = &mut self.state else {
1666 return Poll::Pending;
1667 };
1668
1669 if let Some((task_id, status, result)) = ready!(data.task_manager.poll_task_result(cx))
1670 {
1671 match result {
1672 Ok(sst) => {
1673 data.unsync_data.ack_flushed(&sst);
1674 match status {
1675 UploadingTaskStatus::Sync(sync_id) => {
1676 let syncing_data =
1677 data.syncing_data.get_mut(&sync_id).expect("should exist");
1678 syncing_data.uploaded.push_front(sst.clone());
1679 assert!(syncing_data.remaining_uploading_tasks.remove(&task_id));
1680 data.may_notify_sync_task(&self.context);
1681 }
1682 UploadingTaskStatus::Spilling(table_ids) => {
1683 data.unsync_data
1684 .spilled_data
1685 .insert(task_id, (sst.clone(), table_ids));
1686 }
1687 }
1688 data.check_upload_task_consistency();
1689 Poll::Ready(sst)
1690 }
1691 Err((sync_id, e)) => {
1692 let syncing_data =
1693 data.syncing_data.remove(&sync_id).expect("should exist");
1694 let failed_epochs = syncing_data.sync_table_epochs.clone();
1695 let data = must_match!(replace(
1696 &mut self.state,
1697 UploaderState::Err(ErrState {
1698 failed_sync_table_epochs: syncing_data.sync_table_epochs,
1699 reason: e.as_report().to_string(),
1700 }),
1701 ), UploaderState::Working(data) => data);
1702
1703 let _ = syncing_data
1704 .sync_result_sender
1705 .send(Err(HummockError::other(format!(
1706 "failed to sync: {:?}",
1707 e.as_report()
1708 ))));
1709
1710 data.abort(|| {
1711 HummockError::other(format!(
1712 "previous epoch {:?} failed to sync",
1713 failed_epochs
1714 ))
1715 });
1716 Poll::Pending
1717 }
1718 }
1719 } else {
1720 Poll::Pending
1721 }
1722 })
1723 }
1724}
1725
1726#[cfg(test)]
1727pub(crate) mod tests {
1728 use std::collections::{HashMap, HashSet};
1729 use std::future::{Future, poll_fn};
1730 use std::ops::Deref;
1731 use std::pin::pin;
1732 use std::sync::Arc;
1733 use std::sync::atomic::AtomicUsize;
1734 use std::sync::atomic::Ordering::SeqCst;
1735 use std::task::Poll;
1736
1737 use futures::FutureExt;
1738 use risingwave_common::catalog::TableId;
1739 use risingwave_common::util::epoch::EpochExt;
1740 use risingwave_hummock_sdk::HummockEpoch;
1741 use risingwave_hummock_sdk::vector_index::{
1742 FlatIndexAdd, VectorFileInfo, VectorIndexAdd, VectorStoreInfoDelta,
1743 };
1744 use tokio::sync::oneshot;
1745
1746 use super::test_utils::*;
1747 use crate::hummock::event_handler::uploader::{
1748 HummockUploader, SyncedData, UploadingTask, get_payload_imm_ids,
1749 };
1750 use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID};
1751 use crate::hummock::{HummockError, HummockResult};
1752 use crate::mem_table::ImmutableMemtable;
1753 use crate::opts::StorageOpts;
1754
1755 impl HummockUploader {
1756 pub(super) fn add_imm(&mut self, instance_id: LocalInstanceId, imm: ImmutableMemtable) {
1757 self.add_imms(instance_id, vec![imm]);
1758 }
1759
1760 pub(super) fn start_single_epoch_sync(
1761 &mut self,
1762 epoch: HummockEpoch,
1763 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1764 table_ids: HashSet<TableId>,
1765 ) {
1766 self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]);
1767 }
1768 }
1769
1770 #[tokio::test]
1771 pub async fn test_uploading_task_future() {
1772 let uploader_context = test_uploader_context(dummy_success_upload_future);
1773
1774 let imm = gen_imm(INITIAL_EPOCH).await;
1775 let imm_size = imm.size();
1776 let imm_ids = get_imm_ids(vec![&imm]);
1777 let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1778 assert_eq!(imm_size, task.task_info.task_size);
1779 assert_eq!(imm_ids, task.task_info.imm_ids);
1780 assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs);
1781 let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1782 assert_eq!(
1783 output.sstable_infos(),
1784 &dummy_success_upload_output().new_value_ssts
1785 );
1786 assert_eq!(imm_size, output.imm_size());
1787 assert_eq!(&imm_ids, output.imm_ids());
1788 assert_eq!(&vec![INITIAL_EPOCH], output.epochs());
1789
1790 let uploader_context = test_uploader_context(dummy_fail_upload_future);
1791 let imm = gen_imm(INITIAL_EPOCH).await;
1792 let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1793 let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1794 }
1795
1796 #[tokio::test]
1797 pub async fn test_uploading_task_poll_result() {
1798 let uploader_context = test_uploader_context(dummy_success_upload_future);
1799 let mut task =
1800 UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1801 let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1802 assert_eq!(
1803 output.sstable_infos(),
1804 &dummy_success_upload_output().new_value_ssts
1805 );
1806
1807 let uploader_context = test_uploader_context(dummy_fail_upload_future);
1808 let mut task =
1809 UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1810 let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1811 }
1812
1813 #[tokio::test]
1814 async fn test_uploading_task_poll_ok_with_retry() {
1815 let run_count = Arc::new(AtomicUsize::new(0));
1816 let fail_num = 10;
1817 let run_count_clone = run_count.clone();
1818 let uploader_context = test_uploader_context(move |payload, info| {
1819 let run_count = run_count.clone();
1820 async move {
1821 let ret = if run_count.load(SeqCst) < fail_num {
1823 Err(HummockError::other("fail"))
1824 } else {
1825 dummy_success_upload_future(payload, info).await
1826 };
1827 run_count.fetch_add(1, SeqCst);
1828 ret
1829 }
1830 });
1831 let mut task =
1832 UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1833 let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await;
1834 assert_eq!(fail_num + 1, run_count_clone.load(SeqCst));
1835 assert_eq!(
1836 output.sstable_infos(),
1837 &dummy_success_upload_output().new_value_ssts
1838 );
1839 }
1840
1841 #[tokio::test]
1842 async fn test_uploader_basic() {
1843 let mut uploader = test_uploader(dummy_success_upload_future);
1844 let epoch1 = INITIAL_EPOCH.next_epoch();
1845 const VECTOR_INDEX_TABLE_ID: TableId = TableId::new(234);
1846 uploader.start_epoch(
1847 epoch1,
1848 HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1849 );
1850 let imm = gen_imm(epoch1).await;
1851 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1852 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1853 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1854
1855 uploader.register_vector_writer(VECTOR_INDEX_TABLE_ID, epoch1);
1856 let vector_info_file = VectorFileInfo {
1857 object_id: 1.into(),
1858 vector_count: 1,
1859 file_size: 0,
1860 start_vector_id: 0,
1861 meta_offset: 20,
1862 };
1863 let vector_index_add = VectorIndexAdd::Flat(FlatIndexAdd {
1864 vector_store_info_delta: VectorStoreInfoDelta {
1865 next_vector_id: 1,
1866 added_vector_files: vec![vector_info_file.clone()],
1867 },
1868 });
1869 uploader.vector_writer_seal_epoch(
1870 VECTOR_INDEX_TABLE_ID,
1871 epoch1.next_epoch(),
1872 Some(vector_index_add.clone()),
1873 );
1874
1875 let (sync_tx, sync_rx) = oneshot::channel();
1876 uploader.start_single_epoch_sync(
1877 epoch1,
1878 sync_tx,
1879 HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1880 );
1881 assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1882 assert_eq!(1, uploader.data().syncing_data.len());
1883 let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1884 assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1885 assert!(syncing_data.uploaded.is_empty());
1886 assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1887
1888 let staging_sst = uploader.next_uploaded_sst().await;
1889 assert_eq!(&vec![epoch1], staging_sst.epochs());
1890 assert_eq!(
1891 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1892 staging_sst.imm_ids()
1893 );
1894 assert_eq!(
1895 &dummy_success_upload_output().new_value_ssts,
1896 staging_sst.sstable_infos()
1897 );
1898
1899 match sync_rx.await {
1900 Ok(Ok(data)) => {
1901 let SyncedData {
1902 uploaded_ssts,
1903 table_watermarks,
1904 vector_index_adds,
1905 } = data;
1906 assert_eq!(1, uploaded_ssts.len());
1907 let staging_sst = &uploaded_ssts[0];
1908 assert_eq!(&vec![epoch1], staging_sst.epochs());
1909 assert_eq!(
1910 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1911 staging_sst.imm_ids()
1912 );
1913 assert_eq!(
1914 &dummy_success_upload_output().new_value_ssts,
1915 staging_sst.sstable_infos()
1916 );
1917 assert!(table_watermarks.is_empty());
1918 assert_eq!(vector_index_adds.len(), 1);
1919 let (table_id, vector_index_adds) = vector_index_adds.into_iter().next().unwrap();
1920 assert_eq!(table_id, VECTOR_INDEX_TABLE_ID);
1921 assert_eq!(vector_index_adds.len(), 1);
1922 let synced_vector_index_add = vector_index_adds[0].clone();
1923 assert_eq!(vector_index_add, synced_vector_index_add);
1924 }
1925 _ => unreachable!(),
1926 };
1927 assert_eq!(epoch1, uploader.test_max_synced_epoch());
1928
1929 let new_pinned_version = uploader
1930 .context
1931 .pinned_version
1932 .new_pin_version(test_hummock_version(epoch1))
1933 .unwrap();
1934 uploader.update_pinned_version(new_pinned_version);
1935 assert_eq!(
1936 epoch1,
1937 uploader
1938 .context
1939 .pinned_version
1940 .table_committed_epoch(TEST_TABLE_ID)
1941 .unwrap()
1942 );
1943 }
1944
1945 #[tokio::test]
1946 async fn test_uploader_destroy_instance_before_sync() {
1947 let mut uploader = test_uploader(dummy_success_upload_future);
1948 let epoch1 = INITIAL_EPOCH.next_epoch();
1949 uploader.start_epochs_for_test([epoch1]);
1950 let imm = gen_imm(epoch1).await;
1951 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1952 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1953 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1954 uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);
1955
1956 let (sync_tx, sync_rx) = oneshot::channel();
1957 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1958 assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1959 assert_eq!(1, uploader.data().syncing_data.len());
1960 let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1961 assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1962 assert!(syncing_data.uploaded.is_empty());
1963 assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1964
1965 let staging_sst = uploader.next_uploaded_sst().await;
1966 assert_eq!(&vec![epoch1], staging_sst.epochs());
1967 assert_eq!(
1968 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1969 staging_sst.imm_ids()
1970 );
1971 assert_eq!(
1972 &dummy_success_upload_output().new_value_ssts,
1973 staging_sst.sstable_infos()
1974 );
1975
1976 match sync_rx.await {
1977 Ok(Ok(data)) => {
1978 let SyncedData {
1979 uploaded_ssts,
1980 table_watermarks,
1981 ..
1982 } = data;
1983 assert_eq!(1, uploaded_ssts.len());
1984 let staging_sst = &uploaded_ssts[0];
1985 assert_eq!(&vec![epoch1], staging_sst.epochs());
1986 assert_eq!(
1987 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1988 staging_sst.imm_ids()
1989 );
1990 assert_eq!(
1991 &dummy_success_upload_output().new_value_ssts,
1992 staging_sst.sstable_infos()
1993 );
1994 assert!(table_watermarks.is_empty());
1995 }
1996 _ => unreachable!(),
1997 };
1998 assert!(
1999 !uploader
2000 .data()
2001 .unsync_data
2002 .table_data
2003 .contains_key(&TEST_TABLE_ID)
2004 );
2005 }
2006
2007 #[tokio::test]
2008 async fn test_empty_uploader_sync() {
2009 let mut uploader = test_uploader(dummy_success_upload_future);
2010 let epoch1 = INITIAL_EPOCH.next_epoch();
2011
2012 let (sync_tx, sync_rx) = oneshot::channel();
2013 uploader.start_epochs_for_test([epoch1]);
2014 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2015 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2016 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2017 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2018
2019 assert_uploader_pending(&mut uploader).await;
2020
2021 match sync_rx.await {
2022 Ok(Ok(data)) => {
2023 assert!(data.uploaded_ssts.is_empty());
2024 }
2025 _ => unreachable!(),
2026 };
2027 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2028 let new_pinned_version = uploader
2029 .context
2030 .pinned_version
2031 .new_pin_version(test_hummock_version(epoch1))
2032 .unwrap();
2033 uploader.update_pinned_version(new_pinned_version);
2034 assert!(uploader.data().syncing_data.is_empty());
2035 assert_eq!(
2036 epoch1,
2037 uploader
2038 .context
2039 .pinned_version
2040 .table_committed_epoch(TEST_TABLE_ID)
2041 .unwrap()
2042 );
2043 }
2044
2045 #[tokio::test]
2046 async fn test_uploader_empty_epoch() {
2047 let mut uploader = test_uploader(dummy_success_upload_future);
2048 let epoch1 = INITIAL_EPOCH.next_epoch();
2049 let epoch2 = epoch1.next_epoch();
2050 uploader.start_epochs_for_test([epoch1, epoch2]);
2051 let imm = gen_imm(epoch2).await;
2052 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2054 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2055 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm);
2056
2057 let (sync_tx, sync_rx) = oneshot::channel();
2058 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2059 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2060
2061 assert_uploader_pending(&mut uploader).await;
2062
2063 match sync_rx.await {
2064 Ok(Ok(data)) => {
2065 assert!(data.uploaded_ssts.is_empty());
2066 }
2067 _ => unreachable!(),
2068 };
2069 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2070 let new_pinned_version = uploader
2071 .context
2072 .pinned_version
2073 .new_pin_version(test_hummock_version(epoch1))
2074 .unwrap();
2075 uploader.update_pinned_version(new_pinned_version);
2076 assert!(uploader.data().syncing_data.is_empty());
2077 assert_eq!(
2078 epoch1,
2079 uploader
2080 .context
2081 .pinned_version
2082 .table_committed_epoch(TEST_TABLE_ID)
2083 .unwrap()
2084 );
2085 }
2086
2087 #[tokio::test]
2088 async fn test_uploader_poll_empty() {
2089 let mut uploader = test_uploader(dummy_success_upload_future);
2090 let fut = uploader.next_uploaded_sst();
2091 let mut fut = pin!(fut);
2092 assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await);
2093 }
2094
2095 #[tokio::test]
2096 async fn test_uploader_empty_advance_mce() {
2097 let mut uploader = test_uploader(dummy_success_upload_future);
2098 let initial_pinned_version = uploader.context.pinned_version.clone();
2099 let epoch1 = INITIAL_EPOCH.next_epoch();
2100 let epoch2 = epoch1.next_epoch();
2101 let epoch3 = epoch2.next_epoch();
2102 let epoch4 = epoch3.next_epoch();
2103 let epoch5 = epoch4.next_epoch();
2104 let epoch6 = epoch5.next_epoch();
2105 let version1 = initial_pinned_version
2106 .new_pin_version(test_hummock_version(epoch1))
2107 .unwrap();
2108 let version2 = initial_pinned_version
2109 .new_pin_version(test_hummock_version(epoch2))
2110 .unwrap();
2111 let version3 = initial_pinned_version
2112 .new_pin_version(test_hummock_version(epoch3))
2113 .unwrap();
2114 let version4 = initial_pinned_version
2115 .new_pin_version(test_hummock_version(epoch4))
2116 .unwrap();
2117 let version5 = initial_pinned_version
2118 .new_pin_version(test_hummock_version(epoch5))
2119 .unwrap();
2120
2121 uploader.start_epochs_for_test([epoch6]);
2122 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
2123
2124 uploader.update_pinned_version(version1);
2125 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2126 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2127
2128 let imm = gen_imm(epoch6).await;
2129 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
2130 uploader.update_pinned_version(version2);
2131 assert_eq!(epoch2, uploader.test_max_synced_epoch());
2132 assert_eq!(epoch2, uploader.test_max_syncing_epoch());
2133
2134 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6);
2135 uploader.update_pinned_version(version3);
2136 assert_eq!(epoch3, uploader.test_max_synced_epoch());
2137 assert_eq!(epoch3, uploader.test_max_syncing_epoch());
2138
2139 let (sync_tx, sync_rx) = oneshot::channel();
2140 uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2141 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2142 uploader.update_pinned_version(version4);
2143 assert_eq!(epoch4, uploader.test_max_synced_epoch());
2144 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2145
2146 let sst = uploader.next_uploaded_sst().await;
2147 assert_eq!(&get_imm_ids([&imm]), sst.imm_ids());
2148
2149 match sync_rx.await {
2150 Ok(Ok(data)) => {
2151 assert!(data.table_watermarks.is_empty());
2152 assert_eq!(1, data.uploaded_ssts.len());
2153 assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids());
2154 }
2155 _ => unreachable!(),
2156 }
2157
2158 uploader.update_pinned_version(version5);
2159 assert_eq!(epoch6, uploader.test_max_synced_epoch());
2160 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2161 }
2162
2163 #[tokio::test]
2164 async fn test_uploader_finish_in_order() {
2165 let config = StorageOpts {
2166 shared_buffer_capacity_mb: 1024 * 1024,
2167 shared_buffer_flush_ratio: 0.0,
2168 ..Default::default()
2169 };
2170 let (buffer_tracker, mut uploader, new_task_notifier) =
2171 prepare_uploader_order_test(&config, false);
2172
2173 let epoch1 = INITIAL_EPOCH.next_epoch();
2174 let epoch2 = epoch1.next_epoch();
2175 let epoch3 = epoch2.next_epoch();
2176 let epoch4 = epoch3.next_epoch();
2177 uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]);
2178 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2179 let memory_limiter = Some(memory_limiter.deref());
2180
2181 let instance_id1 = 1;
2182 let instance_id2 = 2;
2183
2184 uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2185 uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2186
2187 let imm2 = gen_imm_with_limiter(epoch2, memory_limiter).await;
2189 uploader.add_imm(instance_id2, imm2.clone());
2190 let imm1_1 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2191 uploader.add_imm(instance_id1, imm1_1.clone());
2192 let imm1_2 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2193 uploader.add_imm(instance_id1, imm1_2.clone());
2194
2195 let epoch1_spill_payload12 =
2197 HashMap::from_iter([(instance_id1, vec![imm1_2.clone(), imm1_1.clone()])]);
2198 let epoch2_spill_payload = HashMap::from_iter([(instance_id2, vec![imm2.clone()])]);
2199 let (await_start1, finish_tx1) =
2200 new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload12));
2201 let (await_start2, finish_tx2) =
2202 new_task_notifier(get_payload_imm_ids(&epoch2_spill_payload));
2203 uploader.may_flush();
2204 await_start1.await;
2205 await_start2.await;
2206
2207 assert_uploader_pending(&mut uploader).await;
2208
2209 finish_tx2.send(()).unwrap();
2210 assert_uploader_pending(&mut uploader).await;
2211
2212 finish_tx1.send(()).unwrap();
2213 let sst = uploader.next_uploaded_sst().await;
2214 assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids());
2215 assert_eq!(&vec![epoch1], sst.epochs());
2216
2217 let sst = uploader.next_uploaded_sst().await;
2218 assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids());
2219 assert_eq!(&vec![epoch2], sst.epochs());
2220
2221 let imm1_3 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2222 uploader.add_imm(instance_id1, imm1_3.clone());
2223 let epoch1_spill_payload3 = HashMap::from_iter([(instance_id1, vec![imm1_3.clone()])]);
2224 let (await_start1_3, finish_tx1_3) =
2225 new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload3));
2226 uploader.may_flush();
2227 await_start1_3.await;
2228 let imm1_4 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2229 uploader.add_imm(instance_id1, imm1_4.clone());
2230 let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]);
2231 let (await_start1_4, finish_tx1_4) =
2232 new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload));
2233 uploader.local_seal_epoch_for_test(instance_id1, epoch1);
2234 let (sync_tx1, mut sync_rx1) = oneshot::channel();
2235 uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID]));
2236 await_start1_4.await;
2237
2238 uploader.local_seal_epoch_for_test(instance_id1, epoch2);
2239 uploader.local_seal_epoch_for_test(instance_id2, epoch2);
2240
2241 let imm3_1 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2247 let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]);
2248 uploader.add_imm(instance_id1, imm3_1.clone());
2249 let (await_start3_1, finish_tx3_1) =
2250 new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload1));
2251 uploader.may_flush();
2252 await_start3_1.await;
2253 let imm3_2 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2254 let epoch3_spill_payload2 = HashMap::from_iter([(instance_id2, vec![imm3_2.clone()])]);
2255 uploader.add_imm(instance_id2, imm3_2.clone());
2256 let (await_start3_2, finish_tx3_2) =
2257 new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload2));
2258 uploader.may_flush();
2259 await_start3_2.await;
2260 let imm3_3 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2261 uploader.add_imm(instance_id1, imm3_3.clone());
2262
2263 uploader.local_seal_epoch_for_test(instance_id1, epoch3);
2269 let imm4 = gen_imm_with_limiter(epoch4, memory_limiter).await;
2270 uploader.add_imm(instance_id1, imm4.clone());
2271 assert_uploader_pending(&mut uploader).await;
2272
2273 finish_tx3_1.send(()).unwrap();
2280 assert_uploader_pending(&mut uploader).await;
2281 finish_tx1_4.send(()).unwrap();
2282 assert_uploader_pending(&mut uploader).await;
2283 finish_tx1_3.send(()).unwrap();
2284
2285 let sst = uploader.next_uploaded_sst().await;
2286 assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids());
2287
2288 assert!(poll_fn(|cx| Poll::Ready(sync_rx1.poll_unpin(cx).is_pending())).await);
2289
2290 let sst = uploader.next_uploaded_sst().await;
2291 assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids());
2292
2293 match sync_rx1.await {
2294 Ok(Ok(data)) => {
2295 assert_eq!(3, data.uploaded_ssts.len());
2296 assert_eq!(
2297 &get_payload_imm_ids(&epoch1_sync_payload),
2298 data.uploaded_ssts[0].imm_ids()
2299 );
2300 assert_eq!(
2301 &get_payload_imm_ids(&epoch1_spill_payload3),
2302 data.uploaded_ssts[1].imm_ids()
2303 );
2304 assert_eq!(
2305 &get_payload_imm_ids(&epoch1_spill_payload12),
2306 data.uploaded_ssts[2].imm_ids()
2307 );
2308 }
2309 _ => {
2310 unreachable!()
2311 }
2312 }
2313
2314 let (sync_tx2, sync_rx2) = oneshot::channel();
2322 uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID]));
2323 uploader.local_seal_epoch_for_test(instance_id2, epoch3);
2324 let sst = uploader.next_uploaded_sst().await;
2325 assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids());
2326
2327 match sync_rx2.await {
2328 Ok(Ok(data)) => {
2329 assert_eq!(data.uploaded_ssts.len(), 1);
2330 assert_eq!(
2331 &get_payload_imm_ids(&epoch2_spill_payload),
2332 data.uploaded_ssts[0].imm_ids()
2333 );
2334 }
2335 _ => {
2336 unreachable!("should be sync finish");
2337 }
2338 }
2339 assert_eq!(epoch2, uploader.test_max_synced_epoch());
2340
2341 uploader.local_seal_epoch_for_test(instance_id1, epoch4);
2349 uploader.local_seal_epoch_for_test(instance_id2, epoch4);
2350 let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]);
2351 let (await_start4_with_3_3, finish_tx4_with_3_3) =
2352 new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload));
2353 let (sync_tx4, mut sync_rx4) = oneshot::channel();
2354 uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID]));
2355 await_start4_with_3_3.await;
2356
2357 assert_uploader_pending(&mut uploader).await;
2365
2366 finish_tx3_2.send(()).unwrap();
2367 let sst = uploader.next_uploaded_sst().await;
2368 assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids());
2369
2370 finish_tx4_with_3_3.send(()).unwrap();
2371 assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await);
2372
2373 let sst = uploader.next_uploaded_sst().await;
2374 assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids());
2375
2376 match sync_rx4.await {
2377 Ok(Ok(data)) => {
2378 assert_eq!(3, data.uploaded_ssts.len());
2379 assert_eq!(
2380 &get_payload_imm_ids(&epoch4_sync_payload),
2381 data.uploaded_ssts[0].imm_ids()
2382 );
2383 assert_eq!(
2384 &get_payload_imm_ids(&epoch3_spill_payload2),
2385 data.uploaded_ssts[1].imm_ids()
2386 );
2387 assert_eq!(
2388 &get_payload_imm_ids(&epoch3_spill_payload1),
2389 data.uploaded_ssts[2].imm_ids(),
2390 )
2391 }
2392 _ => {
2393 unreachable!("should be sync finish");
2394 }
2395 }
2396 assert_eq!(epoch4, uploader.test_max_synced_epoch());
2397
2398 }
2406
2407 #[tokio::test]
2408 async fn test_uploader_frequently_flush() {
2409 let config = StorageOpts {
2410 shared_buffer_capacity_mb: 10,
2411 shared_buffer_flush_ratio: 0.12,
2412 shared_buffer_min_batch_flush_size_mb: 1,
2414 ..Default::default()
2415 };
2416 let (buffer_tracker, mut uploader, _new_task_notifier) =
2417 prepare_uploader_order_test(&config, true);
2418
2419 let epoch1 = INITIAL_EPOCH.next_epoch();
2420 let epoch2 = epoch1.next_epoch();
2421 uploader.start_epochs_for_test([epoch1, epoch2]);
2422 let instance_id1 = 1;
2423 let instance_id2 = 2;
2424 let flush_threshold = buffer_tracker.flush_threshold();
2425 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2426
2427 uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2428 uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2429
2430 let mut total_memory = 0;
2432 while total_memory < flush_threshold {
2433 let imm = gen_imm_with_limiter(epoch2, Some(memory_limiter.as_ref())).await;
2434 total_memory += imm.size();
2435 if total_memory > flush_threshold {
2436 break;
2437 }
2438 uploader.add_imm(instance_id2, imm);
2439 }
2440 let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2441 uploader.add_imm(instance_id1, imm);
2442 assert!(uploader.may_flush());
2443
2444 for _ in 0..10 {
2445 let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2446 uploader.add_imm(instance_id1, imm);
2447 assert!(!uploader.may_flush());
2448 }
2449 }
2450}