1mod spiller;
16mod task_manager;
17pub(crate) mod test_utils;
18
19use std::cmp::Ordering;
20use std::collections::btree_map::Entry;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque, hash_map};
22use std::fmt::{Debug, Display, Formatter};
23use std::future::{Future, poll_fn};
24use std::mem::{replace, swap, take};
25use std::sync::Arc;
26use std::task::{Context, Poll, ready};
27
28use futures::FutureExt;
29use itertools::Itertools;
30use more_asserts::assert_gt;
31use prometheus::{HistogramTimer, IntGauge};
32use risingwave_common::bitmap::BitmapBuilder;
33use risingwave_common::catalog::TableId;
34use risingwave_common::metrics::UintGauge;
35use risingwave_common::must_match;
36use risingwave_hummock_sdk::table_watermark::{
37 TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
40use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId, LocalSstableInfo};
41use task_manager::{TaskManager, UploadingTaskStatus};
42use thiserror_ext::AsReport;
43use tokio::sync::oneshot;
44use tokio::task::JoinHandle;
45use tracing::{debug, error, info, warn};
46
47use crate::hummock::event_handler::LocalInstanceId;
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, send_sync_result};
49use crate::hummock::event_handler::uploader::spiller::Spiller;
50use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm;
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchId;
53use crate::hummock::store::version::StagingSstableInfo;
54use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
55use crate::mem_table::ImmId;
56use crate::monitor::HummockStateStoreMetrics;
57use crate::store::SealCurrentEpochOptions;
58
59fn take_before_epoch<T>(
61 data: &mut BTreeMap<HummockEpoch, T>,
62 epoch: HummockEpoch,
63) -> BTreeMap<HummockEpoch, T> {
64 let mut before_epoch_data = data.split_off(&(epoch + 1));
65 swap(&mut before_epoch_data, data);
66 before_epoch_data
67}
68
69type UploadTaskInput = HashMap<LocalInstanceId, Vec<UploaderImm>>;
70pub type UploadTaskPayload = HashMap<LocalInstanceId, Vec<ImmutableMemtable>>;
71
72#[derive(Debug)]
73pub struct UploadTaskOutput {
74 pub new_value_ssts: Vec<LocalSstableInfo>,
75 pub old_value_ssts: Vec<LocalSstableInfo>,
76 pub wait_poll_timer: Option<HistogramTimer>,
77}
78pub type SpawnUploadTask = Arc<
79 dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle<HummockResult<UploadTaskOutput>>
80 + Send
81 + Sync
82 + 'static,
83>;
84
85#[derive(Clone)]
86pub struct UploadTaskInfo {
87 pub task_size: usize,
88 pub epochs: Vec<HummockEpoch>,
89 pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
90}
91
92impl Display for UploadTaskInfo {
93 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94 f.debug_struct("UploadTaskInfo")
95 .field("task_size", &self.task_size)
96 .field("epochs", &self.epochs)
97 .field("len(imm_ids)", &self.imm_ids.len())
98 .finish()
99 }
100}
101
102impl Debug for UploadTaskInfo {
103 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("UploadTaskInfo")
105 .field("task_size", &self.task_size)
106 .field("epochs", &self.epochs)
107 .field("imm_ids", &self.imm_ids)
108 .finish()
109 }
110}
111
112mod uploader_imm {
113 use std::fmt::Formatter;
114 use std::ops::Deref;
115
116 use risingwave_common::metrics::UintGauge;
117
118 use crate::hummock::event_handler::uploader::UploaderContext;
119 use crate::mem_table::ImmutableMemtable;
120
121 pub(super) struct UploaderImm {
122 inner: ImmutableMemtable,
123 size_guard: UintGauge,
124 }
125
126 impl UploaderImm {
127 pub(super) fn new(imm: ImmutableMemtable, context: &UploaderContext) -> Self {
128 let size = imm.size();
129 let size_guard = context.stats.uploader_imm_size.clone();
130 size_guard.add(size as _);
131 Self {
132 inner: imm,
133 size_guard,
134 }
135 }
136
137 #[cfg(test)]
138 pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
139 Self {
140 inner: imm,
141 size_guard: UintGauge::new("test", "test").unwrap(),
142 }
143 }
144 }
145
146 impl std::fmt::Debug for UploaderImm {
147 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
148 self.inner.fmt(f)
149 }
150 }
151
152 impl Deref for UploaderImm {
153 type Target = ImmutableMemtable;
154
155 fn deref(&self) -> &Self::Target {
156 &self.inner
157 }
158 }
159
160 impl Drop for UploaderImm {
161 fn drop(&mut self) {
162 self.size_guard.sub(self.inner.size() as _);
163 }
164 }
165}
166
167#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
168struct UploadingTaskId(usize);
169
170struct UploadingTask {
173 task_id: UploadingTaskId,
174 input: UploadTaskInput,
176 join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
177 task_info: UploadTaskInfo,
178 spawn_upload_task: SpawnUploadTask,
179 task_size_guard: UintGauge,
180 task_count_guard: IntGauge,
181}
182
183impl Drop for UploadingTask {
184 fn drop(&mut self) {
185 self.task_size_guard.sub(self.task_info.task_size as u64);
186 self.task_count_guard.dec();
187 }
188}
189
190impl Debug for UploadingTask {
191 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
192 f.debug_struct("UploadingTask")
193 .field("input", &self.input)
194 .field("task_info", &self.task_info)
195 .finish()
196 }
197}
198
199fn get_payload_imm_ids(
200 payload: &UploadTaskPayload,
201) -> HashMap<LocalInstanceId, Vec<SharedBufferBatchId>> {
202 payload
203 .iter()
204 .map(|(instance_id, imms)| {
205 (
206 *instance_id,
207 imms.iter().map(|imm| imm.batch_id()).collect_vec(),
208 )
209 })
210 .collect()
211}
212
213impl UploadingTask {
214 const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);
216
217 fn input_to_payload(input: &UploadTaskInput) -> UploadTaskPayload {
218 input
219 .iter()
220 .map(|(instance_id, imms)| {
221 (
222 *instance_id,
223 imms.iter().map(|imm| (**imm).clone()).collect(),
224 )
225 })
226 .collect()
227 }
228
229 fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
230 assert!(!input.is_empty());
231 let mut epochs = input
232 .iter()
233 .flat_map(|(_, imms)| imms.iter().flat_map(|imm| imm.epochs().iter().cloned()))
234 .sorted()
235 .dedup()
236 .collect_vec();
237
238 epochs.reverse();
240 let payload = Self::input_to_payload(&input);
241 let imm_ids = get_payload_imm_ids(&payload);
242 let task_size = input
243 .values()
244 .map(|imms| imms.iter().map(|imm| imm.size()).sum::<usize>())
245 .sum();
246 let task_info = UploadTaskInfo {
247 task_size,
248 epochs,
249 imm_ids,
250 };
251 context
252 .buffer_tracker
253 .global_upload_task_size()
254 .add(task_size as u64);
255 if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
256 info!("start upload task: {:?}", task_info);
257 } else {
258 debug!("start upload task: {:?}", task_info);
259 }
260 let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
261 context.stats.uploader_uploading_task_count.inc();
262 Self {
263 task_id,
264 input,
265 join_handle,
266 task_info,
267 spawn_upload_task: context.spawn_upload_task.clone(),
268 task_size_guard: context.buffer_tracker.global_upload_task_size().clone(),
269 task_count_guard: context.stats.uploader_uploading_task_count.clone(),
270 }
271 }
272
273 fn poll_result(
275 &mut self,
276 cx: &mut Context<'_>,
277 ) -> Poll<HummockResult<Arc<StagingSstableInfo>>> {
278 Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
279 Ok(task_result) => task_result
280 .inspect(|_| {
281 if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
282 info!(task_info = ?self.task_info, "upload task finish");
283 } else {
284 debug!(task_info = ?self.task_info, "upload task finish");
285 }
286 })
287 .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
288 .map(|output| {
289 Arc::new(StagingSstableInfo::new(
290 output.new_value_ssts,
291 output.old_value_ssts,
292 self.task_info.epochs.clone(),
293 self.task_info.imm_ids.clone(),
294 self.task_info.task_size,
295 ))
296 }),
297
298 Err(err) => Err(HummockError::other(format!(
299 "fail to join upload join handle: {}",
300 err.as_report()
301 ))),
302 })
303 }
304
305 fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll<Arc<StagingSstableInfo>> {
307 loop {
308 let result = ready!(self.poll_result(cx));
309 match result {
310 Ok(sstables) => return Poll::Ready(sstables),
311 Err(e) => {
312 error!(
313 error = %e.as_report(),
314 task_info = ?self.task_info,
315 "a flush task failed, start retry",
316 );
317 self.join_handle = (self.spawn_upload_task)(
318 Self::input_to_payload(&self.input),
319 self.task_info.clone(),
320 );
321 }
325 }
326 }
327 }
328
329 pub fn get_task_info(&self) -> &UploadTaskInfo {
330 &self.task_info
331 }
332}
333
334impl TableUnsyncData {
335 fn add_table_watermarks(
336 &mut self,
337 epoch: HummockEpoch,
338 table_watermarks: Vec<VnodeWatermark>,
339 direction: WatermarkDirection,
340 watermark_type: WatermarkSerdeType,
341 ) {
342 if table_watermarks.is_empty() {
343 return;
344 }
345 let vnode_count = table_watermarks[0].vnode_count();
346 for watermark in &table_watermarks {
347 assert_eq!(vnode_count, watermark.vnode_count());
348 }
349
350 fn apply_new_vnodes(
351 vnode_bitmap: &mut BitmapBuilder,
352 vnode_watermarks: &Vec<VnodeWatermark>,
353 ) {
354 for vnode_watermark in vnode_watermarks {
355 for vnode in vnode_watermark.vnode_bitmap().iter_ones() {
356 assert!(
357 !vnode_bitmap.is_set(vnode),
358 "vnode {} write multiple table watermarks",
359 vnode
360 );
361 vnode_bitmap.set(vnode, true);
362 }
363 }
364 }
365 match &mut self.table_watermarks {
366 Some((prev_direction, prev_watermarks, prev_watermark_type)) => {
367 assert_eq!(
368 *prev_direction, direction,
369 "table id {} new watermark direction not match with previous",
370 self.table_id
371 );
372 assert_eq!(
373 *prev_watermark_type, watermark_type,
374 "table id {} new watermark watermark_type not match with previous",
375 self.table_id
376 );
377 match prev_watermarks.entry(epoch) {
378 Entry::Occupied(mut entry) => {
379 let (prev_watermarks, vnode_bitmap) = entry.get_mut();
380 apply_new_vnodes(vnode_bitmap, &table_watermarks);
381 prev_watermarks.extend(table_watermarks);
382 }
383 Entry::Vacant(entry) => {
384 let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
385 apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
386 entry.insert((table_watermarks, vnode_bitmap));
387 }
388 }
389 }
390 None => {
391 let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
392 apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
393 self.table_watermarks = Some((
394 direction,
395 BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]),
396 watermark_type,
397 ));
398 }
399 }
400 }
401}
402
403impl UploaderData {
404 fn add_table_watermarks(
405 all_table_watermarks: &mut HashMap<TableId, TableWatermarks>,
406 table_id: TableId,
407 direction: WatermarkDirection,
408 watermarks: impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)>,
409 watermark_type: WatermarkSerdeType,
410 ) {
411 let mut table_watermarks: Option<TableWatermarks> = None;
412 for (epoch, watermarks) in watermarks {
413 match &mut table_watermarks {
414 Some(prev_watermarks) => {
415 assert_eq!(prev_watermarks.direction, direction);
416 assert_eq!(prev_watermarks.watermark_type, watermark_type);
417 prev_watermarks.add_new_epoch_watermarks(
418 epoch,
419 Arc::from(watermarks),
420 direction,
421 watermark_type,
422 );
423 }
424 None => {
425 table_watermarks = Some(TableWatermarks::single_epoch(
426 epoch,
427 watermarks,
428 direction,
429 watermark_type,
430 ));
431 }
432 }
433 }
434 if let Some(table_watermarks) = table_watermarks {
435 assert!(
436 all_table_watermarks
437 .insert(table_id, table_watermarks)
438 .is_none()
439 );
440 }
441 }
442}
443
444struct LocalInstanceEpochData {
445 epoch: HummockEpoch,
446 imms: VecDeque<UploaderImm>,
448 has_spilled: bool,
449}
450
451impl LocalInstanceEpochData {
452 fn new(epoch: HummockEpoch) -> Self {
453 Self {
454 epoch,
455 imms: VecDeque::new(),
456 has_spilled: false,
457 }
458 }
459
460 fn epoch(&self) -> HummockEpoch {
461 self.epoch
462 }
463
464 fn add_imm(&mut self, imm: UploaderImm) {
465 assert_eq!(imm.max_epoch(), imm.min_epoch());
466 assert_eq!(self.epoch, imm.min_epoch());
467 if let Some(prev_imm) = self.imms.front() {
468 assert_gt!(imm.batch_id(), prev_imm.batch_id());
469 }
470 self.imms.push_front(imm);
471 }
472
473 fn is_empty(&self) -> bool {
474 self.imms.is_empty()
475 }
476}
477
478struct LocalInstanceUnsyncData {
479 table_id: TableId,
480 instance_id: LocalInstanceId,
481 current_epoch_data: Option<LocalInstanceEpochData>,
483 sealed_data: VecDeque<LocalInstanceEpochData>,
485 flushing_imms: VecDeque<SharedBufferBatchId>,
487 is_destroyed: bool,
488}
489
490impl LocalInstanceUnsyncData {
491 fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self {
492 Self {
493 table_id,
494 instance_id,
495 current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
496 sealed_data: VecDeque::new(),
497 flushing_imms: Default::default(),
498 is_destroyed: false,
499 }
500 }
501
502 fn add_imm(&mut self, imm: UploaderImm) {
503 assert!(!self.is_destroyed);
504 assert_eq!(self.table_id, imm.table_id);
505 self.current_epoch_data
506 .as_mut()
507 .expect("should be Some when adding new imm")
508 .add_imm(imm);
509 }
510
511 fn local_seal_epoch(&mut self, next_epoch: HummockEpoch) -> HummockEpoch {
512 assert!(!self.is_destroyed);
513 let data = self
514 .current_epoch_data
515 .as_mut()
516 .expect("should be Some when seal new epoch");
517 let current_epoch = data.epoch;
518 debug!(
519 instance_id = self.instance_id,
520 next_epoch, current_epoch, "local seal epoch"
521 );
522 assert_gt!(next_epoch, current_epoch);
523 let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch));
524 if !epoch_data.is_empty() {
525 self.sealed_data.push_front(epoch_data);
526 }
527 current_epoch
528 }
529
530 fn ack_flushed(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
532 for imm_id in imm_ids {
533 assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id);
534 }
535 }
536
537 fn spill(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
538 let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() {
539 match oldest_sealed_epoch.epoch.cmp(&epoch) {
540 Ordering::Less => {
541 unreachable!(
542 "should not spill at this epoch because there \
543 is unspilled data in previous epoch: prev epoch {}, spill epoch {}",
544 oldest_sealed_epoch.epoch, epoch
545 );
546 }
547 Ordering::Equal => {
548 let epoch_data = self.sealed_data.pop_back().unwrap();
549 assert_eq!(epoch, epoch_data.epoch);
550 epoch_data.imms
551 }
552 Ordering::Greater => VecDeque::new(),
553 }
554 } else {
555 let Some(current_epoch_data) = &mut self.current_epoch_data else {
556 return Vec::new();
557 };
558 match current_epoch_data.epoch.cmp(&epoch) {
559 Ordering::Less => {
560 assert!(
561 current_epoch_data.imms.is_empty(),
562 "should not spill at this epoch because there \
563 is unspilled data in current epoch epoch {}, spill epoch {}",
564 current_epoch_data.epoch,
565 epoch
566 );
567 VecDeque::new()
568 }
569 Ordering::Equal => {
570 if !current_epoch_data.imms.is_empty() {
571 current_epoch_data.has_spilled = true;
572 take(&mut current_epoch_data.imms)
573 } else {
574 VecDeque::new()
575 }
576 }
577 Ordering::Greater => VecDeque::new(),
578 }
579 };
580 self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
581 imms.into_iter().collect()
582 }
583
584 fn add_flushing_imm(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
585 for imm_id in imm_ids {
586 if let Some(prev_imm_id) = self.flushing_imms.front() {
587 assert_gt!(imm_id, *prev_imm_id);
588 }
589 self.flushing_imms.push_front(imm_id);
590 }
591 }
592
593 fn sync(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
596 let mut ret = Vec::new();
598 while let Some(epoch_data) = self.sealed_data.back()
599 && epoch_data.epoch() <= epoch
600 {
601 let imms = self.sealed_data.pop_back().expect("checked exist").imms;
602 self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
603 ret.extend(imms.into_iter().rev());
604 }
605 ret.reverse();
607 if let Some(latest_epoch_data) = &self.current_epoch_data
608 && latest_epoch_data.epoch <= epoch
609 {
610 assert!(self.sealed_data.is_empty());
611 assert!(latest_epoch_data.is_empty());
612 assert!(!latest_epoch_data.has_spilled);
613 if cfg!(debug_assertions) {
614 panic!(
615 "sync epoch exceeds latest epoch, and the current instance should have been archived"
616 );
617 }
618 warn!(
619 instance_id = self.instance_id,
620 table_id = self.table_id.table_id,
621 "sync epoch exceeds latest epoch, and the current instance should have be archived"
622 );
623 self.current_epoch_data = None;
624 }
625 ret
626 }
627
628 fn assert_after_epoch(&self, epoch: HummockEpoch) {
629 if let Some(oldest_sealed_data) = self.sealed_data.back() {
630 assert!(!oldest_sealed_data.imms.is_empty());
631 assert_gt!(oldest_sealed_data.epoch, epoch);
632 } else if let Some(current_data) = &self.current_epoch_data
633 && current_data.epoch <= epoch
634 {
635 assert!(current_data.imms.is_empty() && !current_data.has_spilled);
636 }
637 }
638
639 fn is_finished(&self) -> bool {
640 self.is_destroyed && self.sealed_data.is_empty()
641 }
642}
643
644struct TableUnsyncData {
645 table_id: TableId,
646 instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
647 #[expect(clippy::type_complexity)]
648 table_watermarks: Option<(
649 WatermarkDirection,
650 BTreeMap<HummockEpoch, (Vec<VnodeWatermark>, BitmapBuilder)>,
651 WatermarkSerdeType,
652 )>,
653 spill_tasks: BTreeMap<HummockEpoch, VecDeque<UploadingTaskId>>,
654 unsync_epochs: BTreeMap<HummockEpoch, ()>,
655 stopped_next_epoch: Option<HummockEpoch>,
659 syncing_epochs: VecDeque<HummockEpoch>,
661 max_synced_epoch: Option<HummockEpoch>,
662}
663
664impl TableUnsyncData {
665 fn new(table_id: TableId, committed_epoch: Option<HummockEpoch>) -> Self {
666 Self {
667 table_id,
668 instance_data: Default::default(),
669 table_watermarks: None,
670 spill_tasks: Default::default(),
671 unsync_epochs: Default::default(),
672 stopped_next_epoch: None,
673 syncing_epochs: Default::default(),
674 max_synced_epoch: committed_epoch,
675 }
676 }
677
678 fn new_epoch(&mut self, epoch: HummockEpoch) {
679 debug!(table_id = ?self.table_id, epoch, "table new epoch");
680 if let Some(latest_epoch) = self.max_epoch() {
681 assert_gt!(epoch, latest_epoch);
682 }
683 self.unsync_epochs.insert(epoch, ());
684 }
685
686 #[expect(clippy::type_complexity)]
687 fn sync(
688 &mut self,
689 epoch: HummockEpoch,
690 ) -> (
691 impl Iterator<Item = (LocalInstanceId, Vec<UploaderImm>)> + '_,
692 Option<(
693 WatermarkDirection,
694 impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)> + use<>,
695 WatermarkSerdeType,
696 )>,
697 impl Iterator<Item = UploadingTaskId> + use<>,
698 BTreeMap<HummockEpoch, ()>,
699 ) {
700 if let Some(prev_epoch) = self.max_sync_epoch() {
701 assert_gt!(epoch, prev_epoch)
702 }
703 let epochs = take_before_epoch(&mut self.unsync_epochs, epoch);
704 assert_eq!(
705 *epochs.last_key_value().expect("non-empty").0,
706 epoch,
707 "{epochs:?} {epoch} {:?}",
708 self.table_id
709 );
710 self.syncing_epochs.push_front(epoch);
711 (
712 self.instance_data
713 .iter_mut()
714 .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))),
715 self.table_watermarks
716 .as_mut()
717 .map(|(direction, watermarks, watermark_type)| {
718 let watermarks = take_before_epoch(watermarks, epoch)
719 .into_iter()
720 .map(|(epoch, (watermarks, _))| (epoch, watermarks));
721 (*direction, watermarks, *watermark_type)
722 }),
723 take_before_epoch(&mut self.spill_tasks, epoch)
724 .into_values()
725 .flat_map(|tasks| tasks.into_iter()),
726 epochs,
727 )
728 }
729
730 fn ack_synced(&mut self, sync_epoch: HummockEpoch) {
731 let min_sync_epoch = self.syncing_epochs.pop_back().expect("should exist");
732 assert_eq!(sync_epoch, min_sync_epoch);
733 self.max_synced_epoch = Some(sync_epoch);
734 }
735
736 fn ack_committed(&mut self, committed_epoch: HummockEpoch) {
737 let synced_epoch_advanced = {
738 if let Some(max_synced_epoch) = self.max_synced_epoch
739 && max_synced_epoch >= committed_epoch
740 {
741 false
742 } else {
743 true
744 }
745 };
746 if synced_epoch_advanced {
747 self.max_synced_epoch = Some(committed_epoch);
748 if let Some(min_syncing_epoch) = self.syncing_epochs.back() {
749 assert_gt!(*min_syncing_epoch, committed_epoch);
750 }
751 self.assert_after_epoch(committed_epoch);
752 }
753 }
754
755 fn assert_after_epoch(&self, epoch: HummockEpoch) {
756 self.instance_data
757 .values()
758 .for_each(|instance_data| instance_data.assert_after_epoch(epoch));
759 if let Some((_, watermarks, _)) = &self.table_watermarks
760 && let Some((oldest_epoch, _)) = watermarks.first_key_value()
761 {
762 assert_gt!(*oldest_epoch, epoch);
763 }
764 }
765
766 fn max_sync_epoch(&self) -> Option<HummockEpoch> {
767 self.syncing_epochs
768 .front()
769 .cloned()
770 .or(self.max_synced_epoch)
771 }
772
773 fn max_epoch(&self) -> Option<HummockEpoch> {
774 self.unsync_epochs
775 .last_key_value()
776 .map(|(epoch, _)| *epoch)
777 .or_else(|| self.max_sync_epoch())
778 }
779
780 fn is_empty(&self) -> bool {
781 self.instance_data.is_empty()
782 && self.syncing_epochs.is_empty()
783 && self.unsync_epochs.is_empty()
784 }
785}
786
787#[derive(Eq, Hash, PartialEq, Copy, Clone)]
788struct UnsyncEpochId(HummockEpoch, TableId);
789
790impl UnsyncEpochId {
791 fn epoch(&self) -> HummockEpoch {
792 self.0
793 }
794}
795
796fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet<TableId>) -> Option<UnsyncEpochId> {
797 table_ids
798 .iter()
799 .min()
800 .map(|table_id| UnsyncEpochId(epoch, *table_id))
801}
802
803#[derive(Default)]
804struct UnsyncData {
809 table_data: HashMap<TableId, TableUnsyncData>,
810 instance_table_id: HashMap<LocalInstanceId, TableId>,
812 unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
813 spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
814}
815
816impl UnsyncData {
817 fn init_instance(
818 &mut self,
819 table_id: TableId,
820 instance_id: LocalInstanceId,
821 init_epoch: HummockEpoch,
822 ) {
823 debug!(
824 table_id = table_id.table_id,
825 instance_id, init_epoch, "init epoch"
826 );
827 let table_data = self
828 .table_data
829 .get_mut(&table_id)
830 .unwrap_or_else(|| panic!("should exist. {table_id:?}"));
831 assert!(
832 table_data
833 .instance_data
834 .insert(
835 instance_id,
836 LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch)
837 )
838 .is_none()
839 );
840 assert!(
841 self.instance_table_id
842 .insert(instance_id, table_id)
843 .is_none()
844 );
845 assert!(table_data.unsync_epochs.contains_key(&init_epoch));
846 }
847
848 fn instance_data(
849 &mut self,
850 instance_id: LocalInstanceId,
851 ) -> Option<&mut LocalInstanceUnsyncData> {
852 self.instance_table_id
853 .get_mut(&instance_id)
854 .cloned()
855 .map(move |table_id| {
856 self.table_data
857 .get_mut(&table_id)
858 .expect("should exist")
859 .instance_data
860 .get_mut(&instance_id)
861 .expect("should exist")
862 })
863 }
864
865 fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) {
866 self.instance_data(instance_id)
867 .expect("should exist")
868 .add_imm(imm);
869 }
870
871 fn local_seal_epoch(
872 &mut self,
873 instance_id: LocalInstanceId,
874 next_epoch: HummockEpoch,
875 opts: SealCurrentEpochOptions,
876 ) {
877 let table_id = self.instance_table_id[&instance_id];
878 let table_data = self.table_data.get_mut(&table_id).expect("should exist");
879 let instance_data = table_data
880 .instance_data
881 .get_mut(&instance_id)
882 .expect("should exist");
883 let epoch = instance_data.local_seal_epoch(next_epoch);
884 if !table_data.unsync_epochs.contains_key(&next_epoch) {
888 if let Some(stopped_next_epoch) = table_data.stopped_next_epoch {
889 if stopped_next_epoch != next_epoch {
890 let table_id = table_data.table_id.table_id;
891 let unsync_epochs = table_data.unsync_epochs.keys().collect_vec();
892 if cfg!(debug_assertions) {
893 panic!(
894 "table_id {} stop epoch {} different to prev stop epoch {}. unsync epochs: {:?}, syncing epochs {:?}, max_synced_epoch {:?}",
895 table_id,
896 next_epoch,
897 stopped_next_epoch,
898 unsync_epochs,
899 table_data.syncing_epochs,
900 table_data.max_synced_epoch
901 );
902 } else {
903 warn!(
904 table_id,
905 stopped_next_epoch,
906 next_epoch,
907 ?unsync_epochs,
908 syncing_epochs = ?table_data.syncing_epochs,
909 max_synced_epoch = ?table_data.max_synced_epoch,
910 "different stop epoch"
911 );
912 }
913 }
914 } else {
915 if let Some(max_epoch) = table_data.max_epoch() {
916 assert_gt!(next_epoch, max_epoch);
917 }
918 debug!(?table_id, epoch, next_epoch, "table data has stopped");
919 table_data.stopped_next_epoch = Some(next_epoch);
920 }
921 }
922 if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks {
923 table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type);
924 }
925 }
926
927 fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
928 if let Some(table_id) = self.instance_table_id.get(&instance_id) {
929 debug!(instance_id, "destroy instance");
930 let table_data = self.table_data.get_mut(table_id).expect("should exist");
931 let instance_data = table_data
932 .instance_data
933 .get_mut(&instance_id)
934 .expect("should exist");
935 assert!(
936 !instance_data.is_destroyed,
937 "cannot destroy an instance for twice"
938 );
939 instance_data.is_destroyed = true;
940 }
941 }
942
943 fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
944 for table_id in table_ids {
945 if let Some(table_unsync_data) = self.table_data.remove(table_id) {
946 for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
947 if let Some(task_status) = task_manager.abort_task(task_id) {
948 must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
949 assert!(spill_table_ids.is_subset(table_ids));
950 });
951 }
952 if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
953 assert!(spill_table_ids.is_subset(table_ids));
954 }
955 }
956 assert!(
957 table_unsync_data
958 .instance_data
959 .values()
960 .all(|instance| instance.is_destroyed),
961 "should be clear when dropping the read version instance"
962 );
963 for instance_id in table_unsync_data.instance_data.keys() {
964 assert_eq!(
965 *table_id,
966 self.instance_table_id
967 .remove(instance_id)
968 .expect("should exist")
969 );
970 }
971 }
972 }
973 debug_assert!(
974 self.spilled_data
975 .values()
976 .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))
977 );
978 self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
979 if !unsync_epoch_table_ids.is_disjoint(table_ids) {
980 assert!(unsync_epoch_table_ids.is_subset(table_ids));
981 false
982 } else {
983 true
984 }
985 });
986 assert!(
987 self.instance_table_id
988 .values()
989 .all(|table_id| !table_ids.contains(table_id))
990 );
991 }
992}
993
994impl UploaderData {
995 fn sync(
996 &mut self,
997 context: &UploaderContext,
998 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
999 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1000 ) {
1001 let mut all_table_watermarks = HashMap::new();
1002 let mut uploading_tasks = HashSet::new();
1003 let mut spilled_tasks = BTreeSet::new();
1004 let mut all_table_ids = HashSet::new();
1005 let mut vector_index_adds = HashMap::new();
1006
1007 let mut flush_payload = HashMap::new();
1008
1009 for (epoch, table_ids) in &sync_table_epochs {
1010 let epoch = *epoch;
1011 for table_id in table_ids {
1012 assert!(
1013 all_table_ids.insert(*table_id),
1014 "duplicate sync table epoch: {:?} {:?}",
1015 all_table_ids,
1016 sync_table_epochs
1017 );
1018 }
1019 if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) {
1020 let min_table_id_data = self
1021 .unsync_data
1022 .table_data
1023 .get_mut(&min_table_id)
1024 .expect("should exist");
1025 let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch);
1026 for epoch in epochs.keys() {
1027 assert_eq!(
1028 &self
1029 .unsync_data
1030 .unsync_epochs
1031 .remove(&UnsyncEpochId(*epoch, min_table_id))
1032 .expect("should exist"),
1033 table_ids
1034 );
1035 }
1036 for table_id in table_ids {
1037 let table_data = self
1038 .unsync_data
1039 .table_data
1040 .get_mut(table_id)
1041 .expect("should exist");
1042 let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) =
1043 table_data.sync(epoch);
1044 assert_eq!(table_unsync_epochs, epochs);
1045 for (instance_id, payload) in unflushed_payload {
1046 if !payload.is_empty() {
1047 flush_payload.insert(instance_id, payload);
1048 }
1049 }
1050 table_data.instance_data.retain(|instance_id, data| {
1051 if data.is_finished() {
1053 assert_eq!(
1054 self.unsync_data.instance_table_id.remove(instance_id),
1055 Some(*table_id)
1056 );
1057 false
1058 } else {
1059 true
1060 }
1061 });
1062 if let Some((direction, watermarks, watermark_type)) = table_watermarks {
1063 Self::add_table_watermarks(
1064 &mut all_table_watermarks,
1065 *table_id,
1066 direction,
1067 watermarks,
1068 watermark_type,
1069 );
1070 }
1071 for task_id in task_ids {
1072 if self.unsync_data.spilled_data.contains_key(&task_id) {
1073 spilled_tasks.insert(task_id);
1074 } else {
1075 uploading_tasks.insert(task_id);
1076 }
1077 }
1078
1079 if let hash_map::Entry::Occupied(mut entry) =
1080 self.unsync_vector_index_data.entry(*table_id)
1081 {
1082 let data = entry.get_mut();
1083 let adds = take_before_epoch(&mut data.sealed_epoch_data, epoch)
1084 .into_values()
1085 .flatten()
1086 .collect_vec();
1087 if data.is_dropped && data.sealed_epoch_data.is_empty() {
1088 entry.remove();
1089 }
1090 if !adds.is_empty() {
1091 vector_index_adds
1092 .try_insert(*table_id, adds)
1093 .expect("non-duplicate");
1094 }
1095 }
1096 }
1097 }
1098 }
1099
1100 let sync_id = {
1101 let sync_id = self.next_sync_id;
1102 self.next_sync_id += 1;
1103 SyncId(sync_id)
1104 };
1105
1106 if let Some(extra_flush_task_id) = self.task_manager.sync(
1107 context,
1108 sync_id,
1109 flush_payload,
1110 uploading_tasks.iter().cloned(),
1111 &all_table_ids,
1112 ) {
1113 uploading_tasks.insert(extra_flush_task_id);
1114 }
1115
1116 let uploaded = spilled_tasks
1118 .iter()
1119 .rev()
1120 .map(|task_id| {
1121 let (sst, spill_table_ids) = self
1122 .unsync_data
1123 .spilled_data
1124 .remove(task_id)
1125 .expect("should exist");
1126 assert!(
1127 spill_table_ids.is_subset(&all_table_ids),
1128 "spilled tabled ids {:?} not a subset of sync table id {:?}",
1129 spill_table_ids,
1130 all_table_ids
1131 );
1132 sst
1133 })
1134 .collect();
1135
1136 self.syncing_data.insert(
1137 sync_id,
1138 SyncingData {
1139 sync_table_epochs,
1140 remaining_uploading_tasks: uploading_tasks,
1141 uploaded,
1142 table_watermarks: all_table_watermarks,
1143 vector_index_adds,
1144 sync_result_sender,
1145 },
1146 );
1147
1148 self.check_upload_task_consistency();
1149 }
1150}
1151
1152impl UnsyncData {
1153 fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
1154 for (instance_id, imm_ids) in sstable_info.imm_ids() {
1155 if let Some(instance_data) = self.instance_data(*instance_id) {
1156 instance_data.ack_flushed(imm_ids.iter().rev().cloned());
1158 }
1159 }
1160 }
1161}
1162
1163struct SyncingData {
1164 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1165 remaining_uploading_tasks: HashSet<UploadingTaskId>,
1166 uploaded: VecDeque<Arc<StagingSstableInfo>>,
1168 table_watermarks: HashMap<TableId, TableWatermarks>,
1169 vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1170 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1171}
1172
1173#[derive(Debug)]
1174pub struct SyncedData {
1175 pub uploaded_ssts: VecDeque<Arc<StagingSstableInfo>>,
1176 pub table_watermarks: HashMap<TableId, TableWatermarks>,
1177 pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1178}
1179
1180struct UploaderContext {
1181 pinned_version: PinnedVersion,
1182 spawn_upload_task: SpawnUploadTask,
1184 buffer_tracker: BufferTracker,
1185
1186 stats: Arc<HummockStateStoreMetrics>,
1187}
1188
1189impl UploaderContext {
1190 fn new(
1191 pinned_version: PinnedVersion,
1192 spawn_upload_task: SpawnUploadTask,
1193 buffer_tracker: BufferTracker,
1194 stats: Arc<HummockStateStoreMetrics>,
1195 ) -> Self {
1196 UploaderContext {
1197 pinned_version,
1198 spawn_upload_task,
1199 buffer_tracker,
1200 stats,
1201 }
1202 }
1203}
1204
1205#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
1206struct SyncId(usize);
1207
1208struct UnsyncVectorIndexData {
1209 sealed_epoch_data: BTreeMap<HummockEpoch, Option<VectorIndexAdd>>,
1210 curr_epoch: u64,
1211 is_dropped: bool,
1212}
1213
1214#[derive(Default)]
1215struct UploaderData {
1216 unsync_data: UnsyncData,
1217 unsync_vector_index_data: HashMap<TableId, UnsyncVectorIndexData>,
1218
1219 syncing_data: BTreeMap<SyncId, SyncingData>,
1220
1221 task_manager: TaskManager,
1222 next_sync_id: usize,
1223}
1224
1225impl UploaderData {
1226 fn abort(self, err: impl Fn() -> HummockError) {
1227 self.task_manager.abort_all_tasks();
1228 for syncing_data in self.syncing_data.into_values() {
1229 send_sync_result(syncing_data.sync_result_sender, Err(err()));
1230 }
1231 }
1232
1233 fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
1234 if table_ids.is_empty() {
1235 return;
1236 }
1237 self.unsync_data
1238 .clear_tables(&table_ids, &mut self.task_manager);
1239 self.syncing_data.retain(|sync_id, syncing_data| {
1240 if syncing_data
1241 .sync_table_epochs
1242 .iter()
1243 .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids))
1244 {
1245 assert!(
1246 syncing_data
1247 .sync_table_epochs
1248 .iter()
1249 .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))
1250 );
1251 for task_id in &syncing_data.remaining_uploading_tasks {
1252 match self
1253 .task_manager
1254 .abort_task(*task_id)
1255 .expect("should exist")
1256 {
1257 UploadingTaskStatus::Spilling(spill_table_ids) => {
1258 assert!(spill_table_ids.is_subset(&table_ids));
1259 }
1260 UploadingTaskStatus::Sync(task_sync_id) => {
1261 assert_eq!(sync_id, &task_sync_id);
1262 }
1263 }
1264 }
1265 false
1266 } else {
1267 true
1268 }
1269 });
1270
1271 self.check_upload_task_consistency();
1272 }
1273
1274 fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1275 self.unsync_data
1276 .spilled_data
1277 .values()
1278 .map(|(s, _)| s)
1279 .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
1280 .filter_map(|s| {
1281 s.sstable_infos()
1282 .iter()
1283 .chain(s.old_value_sstable_infos())
1284 .map(|s| s.sst_info.object_id)
1285 .min()
1286 })
1287 .min()
1288 .map(|object_id| object_id.as_raw())
1289 }
1290}
1291
1292struct ErrState {
1293 failed_sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1294 reason: String,
1295}
1296
1297enum UploaderState {
1298 Working(UploaderData),
1299 Err(ErrState),
1300}
1301
1302pub struct HummockUploader {
1315 state: UploaderState,
1316
1317 context: UploaderContext,
1318}
1319
1320impl HummockUploader {
1321 pub(super) fn new(
1322 state_store_metrics: Arc<HummockStateStoreMetrics>,
1323 pinned_version: PinnedVersion,
1324 spawn_upload_task: SpawnUploadTask,
1325 buffer_tracker: BufferTracker,
1326 ) -> Self {
1327 Self {
1328 state: UploaderState::Working(UploaderData::default()),
1329 context: UploaderContext::new(
1330 pinned_version,
1331 spawn_upload_task,
1332 buffer_tracker,
1333 state_store_metrics,
1334 ),
1335 }
1336 }
1337
1338 pub(super) fn buffer_tracker(&self) -> &BufferTracker {
1339 &self.context.buffer_tracker
1340 }
1341
1342 pub(super) fn hummock_version(&self) -> &PinnedVersion {
1343 &self.context.pinned_version
1344 }
1345
1346 pub(super) fn add_imms(&mut self, instance_id: LocalInstanceId, imms: Vec<ImmutableMemtable>) {
1347 let UploaderState::Working(data) = &mut self.state else {
1348 return;
1349 };
1350 for imm in imms {
1351 let imm = UploaderImm::new(imm, &self.context);
1352 data.unsync_data.add_imm(instance_id, imm);
1353 }
1354 }
1355
1356 pub(super) fn init_instance(
1357 &mut self,
1358 instance_id: LocalInstanceId,
1359 table_id: TableId,
1360 init_epoch: HummockEpoch,
1361 ) {
1362 let UploaderState::Working(data) = &mut self.state else {
1363 return;
1364 };
1365 data.unsync_data
1366 .init_instance(table_id, instance_id, init_epoch);
1367 }
1368
1369 pub(super) fn local_seal_epoch(
1370 &mut self,
1371 instance_id: LocalInstanceId,
1372 next_epoch: HummockEpoch,
1373 opts: SealCurrentEpochOptions,
1374 ) {
1375 let UploaderState::Working(data) = &mut self.state else {
1376 return;
1377 };
1378 data.unsync_data
1379 .local_seal_epoch(instance_id, next_epoch, opts);
1380 }
1381
1382 pub(super) fn register_vector_writer(&mut self, table_id: TableId, init_epoch: HummockEpoch) {
1383 let UploaderState::Working(data) = &mut self.state else {
1384 return;
1385 };
1386 assert!(
1387 data.unsync_vector_index_data
1388 .try_insert(
1389 table_id,
1390 UnsyncVectorIndexData {
1391 sealed_epoch_data: Default::default(),
1392 curr_epoch: init_epoch,
1393 is_dropped: false,
1394 }
1395 )
1396 .is_ok(),
1397 "duplicate vector writer on {}",
1398 table_id
1399 )
1400 }
1401
1402 pub(super) fn vector_writer_seal_epoch(
1403 &mut self,
1404 table_id: TableId,
1405 next_epoch: HummockEpoch,
1406 add: Option<VectorIndexAdd>,
1407 ) {
1408 let UploaderState::Working(data) = &mut self.state else {
1409 return;
1410 };
1411 let data = data
1412 .unsync_vector_index_data
1413 .get_mut(&table_id)
1414 .expect("should exist");
1415 assert!(!data.is_dropped);
1416 assert!(
1417 data.curr_epoch < next_epoch,
1418 "next epoch {} should be greater than current epoch {}",
1419 next_epoch,
1420 data.curr_epoch
1421 );
1422 data.sealed_epoch_data
1423 .try_insert(data.curr_epoch, add)
1424 .expect("non-duplicate");
1425 data.curr_epoch = next_epoch;
1426 }
1427
1428 pub(super) fn drop_vector_writer(&mut self, table_id: TableId) {
1429 let UploaderState::Working(data) = &mut self.state else {
1430 return;
1431 };
1432 let hash_map::Entry::Occupied(mut entry) = data.unsync_vector_index_data.entry(table_id)
1433 else {
1434 panic!("vector writer {} should exist", table_id);
1435 };
1436 let data = entry.get_mut();
1437 data.is_dropped = true;
1438 if data.sealed_epoch_data.is_empty() {
1439 entry.remove();
1440 }
1441 }
1442
1443 pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
1444 let UploaderState::Working(data) = &mut self.state else {
1445 return;
1446 };
1447 debug!(epoch, ?table_ids, "start epoch");
1448 for table_id in &table_ids {
1449 let table_data = data
1450 .unsync_data
1451 .table_data
1452 .entry(*table_id)
1453 .or_insert_with(|| {
1454 TableUnsyncData::new(
1455 *table_id,
1456 self.context.pinned_version.table_committed_epoch(*table_id),
1457 )
1458 });
1459 table_data.new_epoch(epoch);
1460 }
1461 if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) {
1462 assert!(
1463 data.unsync_data
1464 .unsync_epochs
1465 .insert(unsync_epoch_id, table_ids)
1466 .is_none()
1467 );
1468 }
1469 }
1470
1471 pub(super) fn start_sync_epoch(
1472 &mut self,
1473 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1474 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1475 ) {
1476 let data = match &mut self.state {
1477 UploaderState::Working(data) => data,
1478 UploaderState::Err(ErrState {
1479 failed_sync_table_epochs,
1480 reason,
1481 }) => {
1482 let result = Err(HummockError::other(format!(
1483 "previous sync epoch {:?} failed due to [{}]",
1484 failed_sync_table_epochs, reason
1485 )));
1486 send_sync_result(sync_result_sender, result);
1487 return;
1488 }
1489 };
1490 debug!(?sync_table_epochs, "start sync epoch");
1491
1492 data.sync(&self.context, sync_result_sender, sync_table_epochs);
1493
1494 data.may_notify_sync_task(&self.context);
1495
1496 self.context
1497 .stats
1498 .uploader_syncing_epoch_count
1499 .set(data.syncing_data.len() as _);
1500 }
1501
1502 pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) {
1503 if let UploaderState::Working(data) = &mut self.state {
1504 for (table_id, info) in pinned_version.state_table_info.info() {
1506 if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) {
1507 table_data.ack_committed(info.committed_epoch);
1508 }
1509 }
1510 }
1511 self.context.pinned_version = pinned_version;
1512 }
1513
1514 pub(crate) fn may_flush(&mut self) -> bool {
1515 let UploaderState::Working(data) = &mut self.state else {
1516 return false;
1517 };
1518 if self.context.buffer_tracker.need_flush() {
1519 let mut spiller = Spiller::new(&mut data.unsync_data);
1520 let mut curr_batch_flush_size = 0;
1521 while self
1523 .context
1524 .buffer_tracker
1525 .need_more_flush(curr_batch_flush_size)
1526 && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload()
1527 {
1528 assert!(!payload.is_empty());
1529 {
1530 let (task_id, task_size, spilled_table_ids) =
1531 data.task_manager
1532 .spill(&self.context, spilled_table_ids, payload);
1533 for table_id in spilled_table_ids {
1534 spiller
1535 .unsync_data()
1536 .table_data
1537 .get_mut(table_id)
1538 .expect("should exist")
1539 .spill_tasks
1540 .entry(epoch)
1541 .or_default()
1542 .push_front(task_id);
1543 }
1544 curr_batch_flush_size += task_size;
1545 }
1546 }
1547 data.check_upload_task_consistency();
1548 curr_batch_flush_size > 0
1549 } else {
1550 false
1551 }
1552 }
1553
1554 pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
1555 if let Some(table_ids) = table_ids {
1556 if let UploaderState::Working(data) = &mut self.state {
1557 data.clear_tables(table_ids);
1558 }
1559 } else {
1560 if let UploaderState::Working(data) = replace(
1561 &mut self.state,
1562 UploaderState::Working(UploaderData::default()),
1563 ) {
1564 data.abort(|| HummockError::other("uploader is reset"));
1565 }
1566
1567 self.context.stats.uploader_syncing_epoch_count.set(0);
1568 }
1569 }
1570
1571 pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
1572 let UploaderState::Working(data) = &mut self.state else {
1573 return;
1574 };
1575 data.unsync_data.may_destroy_instance(instance_id);
1576 }
1577
1578 pub(crate) fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1579 if let UploaderState::Working(ref u) = self.state {
1580 u.min_uncommitted_object_id()
1581 } else {
1582 None
1583 }
1584 }
1585}
1586
1587impl UploaderData {
1588 fn may_notify_sync_task(&mut self, context: &UploaderContext) {
1589 while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
1590 && syncing_data.remaining_uploading_tasks.is_empty()
1591 {
1592 let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty");
1593 let SyncingData {
1594 sync_table_epochs,
1595 remaining_uploading_tasks: _,
1596 uploaded,
1597 table_watermarks,
1598 vector_index_adds,
1599 sync_result_sender,
1600 } = syncing_data;
1601 context
1602 .stats
1603 .uploader_syncing_epoch_count
1604 .set(self.syncing_data.len() as _);
1605
1606 for (sync_epoch, table_ids) in sync_table_epochs {
1607 for table_id in table_ids {
1608 if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) {
1609 table_data.ack_synced(sync_epoch);
1610 if table_data.is_empty() {
1611 self.unsync_data.table_data.remove(&table_id);
1612 }
1613 }
1614 }
1615 }
1616
1617 send_sync_result(
1618 sync_result_sender,
1619 Ok(SyncedData {
1620 uploaded_ssts: uploaded,
1621 table_watermarks,
1622 vector_index_adds,
1623 }),
1624 )
1625 }
1626 }
1627
1628 fn check_upload_task_consistency(&self) {
1629 #[cfg(debug_assertions)]
1630 {
1631 let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new();
1632 for table_data in self.unsync_data.table_data.values() {
1633 for task_id in table_data
1634 .spill_tasks
1635 .iter()
1636 .flat_map(|(_, tasks)| tasks.iter())
1637 {
1638 assert!(
1639 spill_task_table_id_from_data
1640 .entry(*task_id)
1641 .or_default()
1642 .insert(table_data.table_id)
1643 );
1644 }
1645 }
1646 let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self
1647 .syncing_data
1648 .iter()
1649 .filter_map(|(sync_id, data)| {
1650 if data.remaining_uploading_tasks.is_empty() {
1651 None
1652 } else {
1653 Some((*sync_id, data.remaining_uploading_tasks.clone()))
1654 }
1655 })
1656 .collect();
1657
1658 let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1659 for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
1660 spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
1661 }
1662 let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1663 for (task_id, status) in self.task_manager.tasks() {
1664 match status {
1665 UploadingTaskStatus::Spilling(table_ids) => {
1666 assert!(
1667 spill_task_table_id_from_manager
1668 .insert(task_id, table_ids.clone())
1669 .is_none()
1670 );
1671 }
1672 UploadingTaskStatus::Sync(sync_id) => {
1673 assert!(
1674 syncing_task_from_manager
1675 .entry(*sync_id)
1676 .or_default()
1677 .insert(task_id)
1678 );
1679 }
1680 }
1681 }
1682 assert_eq!(
1683 spill_task_table_id_from_data,
1684 spill_task_table_id_from_manager
1685 );
1686 assert_eq!(syncing_task_id_from_data, syncing_task_from_manager);
1687 }
1688 }
1689}
1690
1691impl HummockUploader {
1692 pub(super) fn next_uploaded_sst(
1693 &mut self,
1694 ) -> impl Future<Output = Arc<StagingSstableInfo>> + '_ {
1695 poll_fn(|cx| {
1696 let UploaderState::Working(data) = &mut self.state else {
1697 return Poll::Pending;
1698 };
1699
1700 if let Some((task_id, status, result)) = ready!(data.task_manager.poll_task_result(cx))
1701 {
1702 match result {
1703 Ok(sst) => {
1704 data.unsync_data.ack_flushed(&sst);
1705 match status {
1706 UploadingTaskStatus::Sync(sync_id) => {
1707 let syncing_data =
1708 data.syncing_data.get_mut(&sync_id).expect("should exist");
1709 syncing_data.uploaded.push_front(sst.clone());
1710 assert!(syncing_data.remaining_uploading_tasks.remove(&task_id));
1711 data.may_notify_sync_task(&self.context);
1712 }
1713 UploadingTaskStatus::Spilling(table_ids) => {
1714 data.unsync_data
1715 .spilled_data
1716 .insert(task_id, (sst.clone(), table_ids));
1717 }
1718 }
1719 data.check_upload_task_consistency();
1720 Poll::Ready(sst)
1721 }
1722 Err((sync_id, e)) => {
1723 let syncing_data =
1724 data.syncing_data.remove(&sync_id).expect("should exist");
1725 let failed_epochs = syncing_data.sync_table_epochs.clone();
1726 let data = must_match!(replace(
1727 &mut self.state,
1728 UploaderState::Err(ErrState {
1729 failed_sync_table_epochs: syncing_data.sync_table_epochs,
1730 reason: e.as_report().to_string(),
1731 }),
1732 ), UploaderState::Working(data) => data);
1733
1734 let _ = syncing_data
1735 .sync_result_sender
1736 .send(Err(HummockError::other(format!(
1737 "failed to sync: {:?}",
1738 e.as_report()
1739 ))));
1740
1741 data.abort(|| {
1742 HummockError::other(format!(
1743 "previous epoch {:?} failed to sync",
1744 failed_epochs
1745 ))
1746 });
1747 Poll::Pending
1748 }
1749 }
1750 } else {
1751 Poll::Pending
1752 }
1753 })
1754 }
1755}
1756
1757#[cfg(test)]
1758pub(crate) mod tests {
1759 use std::collections::{HashMap, HashSet};
1760 use std::future::{Future, poll_fn};
1761 use std::ops::Deref;
1762 use std::pin::pin;
1763 use std::sync::Arc;
1764 use std::sync::atomic::AtomicUsize;
1765 use std::sync::atomic::Ordering::SeqCst;
1766 use std::task::Poll;
1767
1768 use futures::FutureExt;
1769 use risingwave_common::catalog::TableId;
1770 use risingwave_common::util::epoch::EpochExt;
1771 use risingwave_hummock_sdk::HummockEpoch;
1772 use risingwave_hummock_sdk::vector_index::{
1773 FlatIndexAdd, VectorFileInfo, VectorIndexAdd, VectorStoreInfoDelta,
1774 };
1775 use tokio::sync::oneshot;
1776
1777 use super::test_utils::*;
1778 use crate::hummock::event_handler::uploader::{
1779 HummockUploader, SyncedData, UploadingTask, get_payload_imm_ids,
1780 };
1781 use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID};
1782 use crate::hummock::{HummockError, HummockResult};
1783 use crate::mem_table::ImmutableMemtable;
1784 use crate::opts::StorageOpts;
1785
1786 impl HummockUploader {
1787 pub(super) fn add_imm(&mut self, instance_id: LocalInstanceId, imm: ImmutableMemtable) {
1788 self.add_imms(instance_id, vec![imm]);
1789 }
1790
1791 pub(super) fn start_single_epoch_sync(
1792 &mut self,
1793 epoch: HummockEpoch,
1794 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1795 table_ids: HashSet<TableId>,
1796 ) {
1797 self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]);
1798 }
1799 }
1800
1801 #[tokio::test]
1802 pub async fn test_uploading_task_future() {
1803 let uploader_context = test_uploader_context(dummy_success_upload_future);
1804
1805 let imm = gen_imm(INITIAL_EPOCH).await;
1806 let imm_size = imm.size();
1807 let imm_ids = get_imm_ids(vec![&imm]);
1808 let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1809 assert_eq!(imm_size, task.task_info.task_size);
1810 assert_eq!(imm_ids, task.task_info.imm_ids);
1811 assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs);
1812 let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1813 assert_eq!(
1814 output.sstable_infos(),
1815 &dummy_success_upload_output().new_value_ssts
1816 );
1817 assert_eq!(imm_size, output.imm_size());
1818 assert_eq!(&imm_ids, output.imm_ids());
1819 assert_eq!(&vec![INITIAL_EPOCH], output.epochs());
1820
1821 let uploader_context = test_uploader_context(dummy_fail_upload_future);
1822 let imm = gen_imm(INITIAL_EPOCH).await;
1823 let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1824 let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1825 }
1826
1827 #[tokio::test]
1828 pub async fn test_uploading_task_poll_result() {
1829 let uploader_context = test_uploader_context(dummy_success_upload_future);
1830 let mut task =
1831 UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1832 let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1833 assert_eq!(
1834 output.sstable_infos(),
1835 &dummy_success_upload_output().new_value_ssts
1836 );
1837
1838 let uploader_context = test_uploader_context(dummy_fail_upload_future);
1839 let mut task =
1840 UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1841 let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1842 }
1843
1844 #[tokio::test]
1845 async fn test_uploading_task_poll_ok_with_retry() {
1846 let run_count = Arc::new(AtomicUsize::new(0));
1847 let fail_num = 10;
1848 let run_count_clone = run_count.clone();
1849 let uploader_context = test_uploader_context(move |payload, info| {
1850 let run_count = run_count.clone();
1851 async move {
1852 let ret = if run_count.load(SeqCst) < fail_num {
1854 Err(HummockError::other("fail"))
1855 } else {
1856 dummy_success_upload_future(payload, info).await
1857 };
1858 run_count.fetch_add(1, SeqCst);
1859 ret
1860 }
1861 });
1862 let mut task =
1863 UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1864 let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await;
1865 assert_eq!(fail_num + 1, run_count_clone.load(SeqCst));
1866 assert_eq!(
1867 output.sstable_infos(),
1868 &dummy_success_upload_output().new_value_ssts
1869 );
1870 }
1871
1872 #[tokio::test]
1873 async fn test_uploader_basic() {
1874 let mut uploader = test_uploader(dummy_success_upload_future);
1875 let epoch1 = INITIAL_EPOCH.next_epoch();
1876 const VECTOR_INDEX_TABLE_ID: TableId = TableId::new(234);
1877 uploader.start_epoch(
1878 epoch1,
1879 HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1880 );
1881 let imm = gen_imm(epoch1).await;
1882 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1883 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1884 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1885
1886 uploader.register_vector_writer(VECTOR_INDEX_TABLE_ID, epoch1);
1887 let vector_info_file = VectorFileInfo {
1888 object_id: 1.into(),
1889 vector_count: 1,
1890 file_size: 0,
1891 start_vector_id: 0,
1892 meta_offset: 20,
1893 };
1894 let vector_index_add = VectorIndexAdd::Flat(FlatIndexAdd {
1895 vector_store_info_delta: VectorStoreInfoDelta {
1896 next_vector_id: 1,
1897 added_vector_files: vec![vector_info_file.clone()],
1898 },
1899 });
1900 uploader.vector_writer_seal_epoch(
1901 VECTOR_INDEX_TABLE_ID,
1902 epoch1.next_epoch(),
1903 Some(vector_index_add.clone()),
1904 );
1905
1906 let (sync_tx, sync_rx) = oneshot::channel();
1907 uploader.start_single_epoch_sync(
1908 epoch1,
1909 sync_tx,
1910 HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1911 );
1912 assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1913 assert_eq!(1, uploader.data().syncing_data.len());
1914 let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1915 assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1916 assert!(syncing_data.uploaded.is_empty());
1917 assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1918
1919 let staging_sst = uploader.next_uploaded_sst().await;
1920 assert_eq!(&vec![epoch1], staging_sst.epochs());
1921 assert_eq!(
1922 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1923 staging_sst.imm_ids()
1924 );
1925 assert_eq!(
1926 &dummy_success_upload_output().new_value_ssts,
1927 staging_sst.sstable_infos()
1928 );
1929
1930 match sync_rx.await {
1931 Ok(Ok(data)) => {
1932 let SyncedData {
1933 uploaded_ssts,
1934 table_watermarks,
1935 vector_index_adds,
1936 } = data;
1937 assert_eq!(1, uploaded_ssts.len());
1938 let staging_sst = &uploaded_ssts[0];
1939 assert_eq!(&vec![epoch1], staging_sst.epochs());
1940 assert_eq!(
1941 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1942 staging_sst.imm_ids()
1943 );
1944 assert_eq!(
1945 &dummy_success_upload_output().new_value_ssts,
1946 staging_sst.sstable_infos()
1947 );
1948 assert!(table_watermarks.is_empty());
1949 assert_eq!(vector_index_adds.len(), 1);
1950 let (table_id, vector_index_adds) = vector_index_adds.into_iter().next().unwrap();
1951 assert_eq!(table_id, VECTOR_INDEX_TABLE_ID);
1952 assert_eq!(vector_index_adds.len(), 1);
1953 let synced_vector_index_add = vector_index_adds[0].clone();
1954 assert_eq!(vector_index_add, synced_vector_index_add);
1955 }
1956 _ => unreachable!(),
1957 };
1958 assert_eq!(epoch1, uploader.test_max_synced_epoch());
1959
1960 let new_pinned_version = uploader
1961 .context
1962 .pinned_version
1963 .new_pin_version(test_hummock_version(epoch1))
1964 .unwrap();
1965 uploader.update_pinned_version(new_pinned_version);
1966 assert_eq!(
1967 epoch1,
1968 uploader
1969 .context
1970 .pinned_version
1971 .table_committed_epoch(TEST_TABLE_ID)
1972 .unwrap()
1973 );
1974 }
1975
1976 #[tokio::test]
1977 async fn test_uploader_destroy_instance_before_sync() {
1978 let mut uploader = test_uploader(dummy_success_upload_future);
1979 let epoch1 = INITIAL_EPOCH.next_epoch();
1980 uploader.start_epochs_for_test([epoch1]);
1981 let imm = gen_imm(epoch1).await;
1982 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1983 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1984 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1985 uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);
1986
1987 let (sync_tx, sync_rx) = oneshot::channel();
1988 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1989 assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1990 assert_eq!(1, uploader.data().syncing_data.len());
1991 let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1992 assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1993 assert!(syncing_data.uploaded.is_empty());
1994 assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1995
1996 let staging_sst = uploader.next_uploaded_sst().await;
1997 assert_eq!(&vec![epoch1], staging_sst.epochs());
1998 assert_eq!(
1999 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2000 staging_sst.imm_ids()
2001 );
2002 assert_eq!(
2003 &dummy_success_upload_output().new_value_ssts,
2004 staging_sst.sstable_infos()
2005 );
2006
2007 match sync_rx.await {
2008 Ok(Ok(data)) => {
2009 let SyncedData {
2010 uploaded_ssts,
2011 table_watermarks,
2012 ..
2013 } = data;
2014 assert_eq!(1, uploaded_ssts.len());
2015 let staging_sst = &uploaded_ssts[0];
2016 assert_eq!(&vec![epoch1], staging_sst.epochs());
2017 assert_eq!(
2018 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2019 staging_sst.imm_ids()
2020 );
2021 assert_eq!(
2022 &dummy_success_upload_output().new_value_ssts,
2023 staging_sst.sstable_infos()
2024 );
2025 assert!(table_watermarks.is_empty());
2026 }
2027 _ => unreachable!(),
2028 };
2029 assert!(
2030 !uploader
2031 .data()
2032 .unsync_data
2033 .table_data
2034 .contains_key(&TEST_TABLE_ID)
2035 );
2036 }
2037
2038 #[tokio::test]
2039 async fn test_empty_uploader_sync() {
2040 let mut uploader = test_uploader(dummy_success_upload_future);
2041 let epoch1 = INITIAL_EPOCH.next_epoch();
2042
2043 let (sync_tx, sync_rx) = oneshot::channel();
2044 uploader.start_epochs_for_test([epoch1]);
2045 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2046 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2047 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2048 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2049
2050 assert_uploader_pending(&mut uploader).await;
2051
2052 match sync_rx.await {
2053 Ok(Ok(data)) => {
2054 assert!(data.uploaded_ssts.is_empty());
2055 }
2056 _ => unreachable!(),
2057 };
2058 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2059 let new_pinned_version = uploader
2060 .context
2061 .pinned_version
2062 .new_pin_version(test_hummock_version(epoch1))
2063 .unwrap();
2064 uploader.update_pinned_version(new_pinned_version);
2065 assert!(uploader.data().syncing_data.is_empty());
2066 assert_eq!(
2067 epoch1,
2068 uploader
2069 .context
2070 .pinned_version
2071 .table_committed_epoch(TEST_TABLE_ID)
2072 .unwrap()
2073 );
2074 }
2075
2076 #[tokio::test]
2077 async fn test_uploader_empty_epoch() {
2078 let mut uploader = test_uploader(dummy_success_upload_future);
2079 let epoch1 = INITIAL_EPOCH.next_epoch();
2080 let epoch2 = epoch1.next_epoch();
2081 uploader.start_epochs_for_test([epoch1, epoch2]);
2082 let imm = gen_imm(epoch2).await;
2083 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2085 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2086 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm);
2087
2088 let (sync_tx, sync_rx) = oneshot::channel();
2089 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2090 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2091
2092 assert_uploader_pending(&mut uploader).await;
2093
2094 match sync_rx.await {
2095 Ok(Ok(data)) => {
2096 assert!(data.uploaded_ssts.is_empty());
2097 }
2098 _ => unreachable!(),
2099 };
2100 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2101 let new_pinned_version = uploader
2102 .context
2103 .pinned_version
2104 .new_pin_version(test_hummock_version(epoch1))
2105 .unwrap();
2106 uploader.update_pinned_version(new_pinned_version);
2107 assert!(uploader.data().syncing_data.is_empty());
2108 assert_eq!(
2109 epoch1,
2110 uploader
2111 .context
2112 .pinned_version
2113 .table_committed_epoch(TEST_TABLE_ID)
2114 .unwrap()
2115 );
2116 }
2117
2118 #[tokio::test]
2119 async fn test_uploader_poll_empty() {
2120 let mut uploader = test_uploader(dummy_success_upload_future);
2121 let fut = uploader.next_uploaded_sst();
2122 let mut fut = pin!(fut);
2123 assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await);
2124 }
2125
2126 #[tokio::test]
2127 async fn test_uploader_empty_advance_mce() {
2128 let mut uploader = test_uploader(dummy_success_upload_future);
2129 let initial_pinned_version = uploader.context.pinned_version.clone();
2130 let epoch1 = INITIAL_EPOCH.next_epoch();
2131 let epoch2 = epoch1.next_epoch();
2132 let epoch3 = epoch2.next_epoch();
2133 let epoch4 = epoch3.next_epoch();
2134 let epoch5 = epoch4.next_epoch();
2135 let epoch6 = epoch5.next_epoch();
2136 let version1 = initial_pinned_version
2137 .new_pin_version(test_hummock_version(epoch1))
2138 .unwrap();
2139 let version2 = initial_pinned_version
2140 .new_pin_version(test_hummock_version(epoch2))
2141 .unwrap();
2142 let version3 = initial_pinned_version
2143 .new_pin_version(test_hummock_version(epoch3))
2144 .unwrap();
2145 let version4 = initial_pinned_version
2146 .new_pin_version(test_hummock_version(epoch4))
2147 .unwrap();
2148 let version5 = initial_pinned_version
2149 .new_pin_version(test_hummock_version(epoch5))
2150 .unwrap();
2151
2152 uploader.start_epochs_for_test([epoch6]);
2153 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
2154
2155 uploader.update_pinned_version(version1);
2156 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2157 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2158
2159 let imm = gen_imm(epoch6).await;
2160 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
2161 uploader.update_pinned_version(version2);
2162 assert_eq!(epoch2, uploader.test_max_synced_epoch());
2163 assert_eq!(epoch2, uploader.test_max_syncing_epoch());
2164
2165 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6);
2166 uploader.update_pinned_version(version3);
2167 assert_eq!(epoch3, uploader.test_max_synced_epoch());
2168 assert_eq!(epoch3, uploader.test_max_syncing_epoch());
2169
2170 let (sync_tx, sync_rx) = oneshot::channel();
2171 uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2172 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2173 uploader.update_pinned_version(version4);
2174 assert_eq!(epoch4, uploader.test_max_synced_epoch());
2175 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2176
2177 let sst = uploader.next_uploaded_sst().await;
2178 assert_eq!(&get_imm_ids([&imm]), sst.imm_ids());
2179
2180 match sync_rx.await {
2181 Ok(Ok(data)) => {
2182 assert!(data.table_watermarks.is_empty());
2183 assert_eq!(1, data.uploaded_ssts.len());
2184 assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids());
2185 }
2186 _ => unreachable!(),
2187 }
2188
2189 uploader.update_pinned_version(version5);
2190 assert_eq!(epoch6, uploader.test_max_synced_epoch());
2191 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2192 }
2193
2194 #[tokio::test]
2195 async fn test_uploader_finish_in_order() {
2196 let config = StorageOpts {
2197 shared_buffer_capacity_mb: 1024 * 1024,
2198 shared_buffer_flush_ratio: 0.0,
2199 ..Default::default()
2200 };
2201 let (buffer_tracker, mut uploader, new_task_notifier) =
2202 prepare_uploader_order_test(&config, false);
2203
2204 let epoch1 = INITIAL_EPOCH.next_epoch();
2205 let epoch2 = epoch1.next_epoch();
2206 let epoch3 = epoch2.next_epoch();
2207 let epoch4 = epoch3.next_epoch();
2208 uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]);
2209 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2210 let memory_limiter = Some(memory_limiter.deref());
2211
2212 let instance_id1 = 1;
2213 let instance_id2 = 2;
2214
2215 uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2216 uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2217
2218 let imm2 = gen_imm_with_limiter(epoch2, memory_limiter).await;
2220 uploader.add_imm(instance_id2, imm2.clone());
2221 let imm1_1 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2222 uploader.add_imm(instance_id1, imm1_1.clone());
2223 let imm1_2 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2224 uploader.add_imm(instance_id1, imm1_2.clone());
2225
2226 let epoch1_spill_payload12 =
2228 HashMap::from_iter([(instance_id1, vec![imm1_2.clone(), imm1_1.clone()])]);
2229 let epoch2_spill_payload = HashMap::from_iter([(instance_id2, vec![imm2.clone()])]);
2230 let (await_start1, finish_tx1) =
2231 new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload12));
2232 let (await_start2, finish_tx2) =
2233 new_task_notifier(get_payload_imm_ids(&epoch2_spill_payload));
2234 uploader.may_flush();
2235 await_start1.await;
2236 await_start2.await;
2237
2238 assert_uploader_pending(&mut uploader).await;
2239
2240 finish_tx2.send(()).unwrap();
2241 assert_uploader_pending(&mut uploader).await;
2242
2243 finish_tx1.send(()).unwrap();
2244 let sst = uploader.next_uploaded_sst().await;
2245 assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids());
2246 assert_eq!(&vec![epoch1], sst.epochs());
2247
2248 let sst = uploader.next_uploaded_sst().await;
2249 assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids());
2250 assert_eq!(&vec![epoch2], sst.epochs());
2251
2252 let imm1_3 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2253 uploader.add_imm(instance_id1, imm1_3.clone());
2254 let epoch1_spill_payload3 = HashMap::from_iter([(instance_id1, vec![imm1_3.clone()])]);
2255 let (await_start1_3, finish_tx1_3) =
2256 new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload3));
2257 uploader.may_flush();
2258 await_start1_3.await;
2259 let imm1_4 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2260 uploader.add_imm(instance_id1, imm1_4.clone());
2261 let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]);
2262 let (await_start1_4, finish_tx1_4) =
2263 new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload));
2264 uploader.local_seal_epoch_for_test(instance_id1, epoch1);
2265 let (sync_tx1, mut sync_rx1) = oneshot::channel();
2266 uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID]));
2267 await_start1_4.await;
2268
2269 uploader.local_seal_epoch_for_test(instance_id1, epoch2);
2270 uploader.local_seal_epoch_for_test(instance_id2, epoch2);
2271
2272 let imm3_1 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2278 let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]);
2279 uploader.add_imm(instance_id1, imm3_1.clone());
2280 let (await_start3_1, finish_tx3_1) =
2281 new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload1));
2282 uploader.may_flush();
2283 await_start3_1.await;
2284 let imm3_2 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2285 let epoch3_spill_payload2 = HashMap::from_iter([(instance_id2, vec![imm3_2.clone()])]);
2286 uploader.add_imm(instance_id2, imm3_2.clone());
2287 let (await_start3_2, finish_tx3_2) =
2288 new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload2));
2289 uploader.may_flush();
2290 await_start3_2.await;
2291 let imm3_3 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2292 uploader.add_imm(instance_id1, imm3_3.clone());
2293
2294 uploader.local_seal_epoch_for_test(instance_id1, epoch3);
2300 let imm4 = gen_imm_with_limiter(epoch4, memory_limiter).await;
2301 uploader.add_imm(instance_id1, imm4.clone());
2302 assert_uploader_pending(&mut uploader).await;
2303
2304 finish_tx3_1.send(()).unwrap();
2311 assert_uploader_pending(&mut uploader).await;
2312 finish_tx1_4.send(()).unwrap();
2313 assert_uploader_pending(&mut uploader).await;
2314 finish_tx1_3.send(()).unwrap();
2315
2316 let sst = uploader.next_uploaded_sst().await;
2317 assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids());
2318
2319 assert!(poll_fn(|cx| Poll::Ready(sync_rx1.poll_unpin(cx).is_pending())).await);
2320
2321 let sst = uploader.next_uploaded_sst().await;
2322 assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids());
2323
2324 match sync_rx1.await {
2325 Ok(Ok(data)) => {
2326 assert_eq!(3, data.uploaded_ssts.len());
2327 assert_eq!(
2328 &get_payload_imm_ids(&epoch1_sync_payload),
2329 data.uploaded_ssts[0].imm_ids()
2330 );
2331 assert_eq!(
2332 &get_payload_imm_ids(&epoch1_spill_payload3),
2333 data.uploaded_ssts[1].imm_ids()
2334 );
2335 assert_eq!(
2336 &get_payload_imm_ids(&epoch1_spill_payload12),
2337 data.uploaded_ssts[2].imm_ids()
2338 );
2339 }
2340 _ => {
2341 unreachable!()
2342 }
2343 }
2344
2345 let (sync_tx2, sync_rx2) = oneshot::channel();
2353 uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID]));
2354 uploader.local_seal_epoch_for_test(instance_id2, epoch3);
2355 let sst = uploader.next_uploaded_sst().await;
2356 assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids());
2357
2358 match sync_rx2.await {
2359 Ok(Ok(data)) => {
2360 assert_eq!(data.uploaded_ssts.len(), 1);
2361 assert_eq!(
2362 &get_payload_imm_ids(&epoch2_spill_payload),
2363 data.uploaded_ssts[0].imm_ids()
2364 );
2365 }
2366 _ => {
2367 unreachable!("should be sync finish");
2368 }
2369 }
2370 assert_eq!(epoch2, uploader.test_max_synced_epoch());
2371
2372 uploader.local_seal_epoch_for_test(instance_id1, epoch4);
2380 uploader.local_seal_epoch_for_test(instance_id2, epoch4);
2381 let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]);
2382 let (await_start4_with_3_3, finish_tx4_with_3_3) =
2383 new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload));
2384 let (sync_tx4, mut sync_rx4) = oneshot::channel();
2385 uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID]));
2386 await_start4_with_3_3.await;
2387
2388 assert_uploader_pending(&mut uploader).await;
2396
2397 finish_tx3_2.send(()).unwrap();
2398 let sst = uploader.next_uploaded_sst().await;
2399 assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids());
2400
2401 finish_tx4_with_3_3.send(()).unwrap();
2402 assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await);
2403
2404 let sst = uploader.next_uploaded_sst().await;
2405 assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids());
2406
2407 match sync_rx4.await {
2408 Ok(Ok(data)) => {
2409 assert_eq!(3, data.uploaded_ssts.len());
2410 assert_eq!(
2411 &get_payload_imm_ids(&epoch4_sync_payload),
2412 data.uploaded_ssts[0].imm_ids()
2413 );
2414 assert_eq!(
2415 &get_payload_imm_ids(&epoch3_spill_payload2),
2416 data.uploaded_ssts[1].imm_ids()
2417 );
2418 assert_eq!(
2419 &get_payload_imm_ids(&epoch3_spill_payload1),
2420 data.uploaded_ssts[2].imm_ids(),
2421 )
2422 }
2423 _ => {
2424 unreachable!("should be sync finish");
2425 }
2426 }
2427 assert_eq!(epoch4, uploader.test_max_synced_epoch());
2428
2429 }
2437
2438 #[tokio::test]
2439 async fn test_uploader_frequently_flush() {
2440 let config = StorageOpts {
2441 shared_buffer_capacity_mb: 10,
2442 shared_buffer_flush_ratio: 0.8,
2443 shared_buffer_min_batch_flush_size_mb: 1,
2445 ..Default::default()
2446 };
2447 let (buffer_tracker, mut uploader, _new_task_notifier) =
2448 prepare_uploader_order_test(&config, true);
2449
2450 let epoch1 = INITIAL_EPOCH.next_epoch();
2451 let epoch2 = epoch1.next_epoch();
2452 uploader.start_epochs_for_test([epoch1, epoch2]);
2453 let instance_id1 = 1;
2454 let instance_id2 = 2;
2455 let flush_threshold = buffer_tracker.flush_threshold();
2456 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2457
2458 uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2459 uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2460
2461 let mut total_memory = 0;
2463 while total_memory < flush_threshold {
2464 let imm = gen_imm_with_limiter(epoch2, Some(memory_limiter.as_ref())).await;
2465 total_memory += imm.size();
2466 if total_memory > flush_threshold {
2467 break;
2468 }
2469 uploader.add_imm(instance_id2, imm);
2470 }
2471 let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2472 uploader.add_imm(instance_id1, imm);
2473 assert!(uploader.may_flush());
2474
2475 for _ in 0..10 {
2476 let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2477 uploader.add_imm(instance_id1, imm);
2478 assert!(!uploader.may_flush());
2479 }
2480 }
2481}