1mod spiller;
16mod task_manager;
17pub(crate) mod test_utils;
18
19use std::cmp::Ordering;
20use std::collections::btree_map::Entry;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque, hash_map};
22use std::fmt::{Debug, Display, Formatter};
23use std::future::{Future, poll_fn};
24use std::mem::{replace, swap, take};
25use std::sync::Arc;
26use std::task::{Context, Poll, ready};
27
28use futures::FutureExt;
29use itertools::Itertools;
30use more_asserts::assert_gt;
31use prometheus::{Histogram, HistogramTimer, IntGauge};
32use risingwave_common::bitmap::BitmapBuilder;
33use risingwave_common::catalog::TableId;
34use risingwave_common::metrics::UintGauge;
35use risingwave_common::must_match;
36use risingwave_hummock_sdk::table_watermark::{
37 TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
40use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId, LocalSstableInfo};
41use task_manager::{TaskManager, UploadingTaskStatus};
42use thiserror_ext::AsReport;
43use tokio::sync::oneshot;
44use tokio::task::JoinHandle;
45use tracing::{debug, error, info, warn};
46
47use crate::hummock::event_handler::LocalInstanceId;
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, send_sync_result};
49use crate::hummock::event_handler::uploader::spiller::Spiller;
50use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm;
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchId;
53use crate::hummock::store::version::StagingSstableInfo;
54use crate::hummock::utils::MemoryTracker;
55use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
56use crate::mem_table::ImmId;
57use crate::monitor::HummockStateStoreMetrics;
58use crate::store::SealCurrentEpochOptions;
59
60fn take_before_epoch<T>(
62 data: &mut BTreeMap<HummockEpoch, T>,
63 epoch: HummockEpoch,
64) -> BTreeMap<HummockEpoch, T> {
65 let mut before_epoch_data = data.split_off(&(epoch + 1));
66 swap(&mut before_epoch_data, data);
67 before_epoch_data
68}
69
70type UploadTaskInput = HashMap<LocalInstanceId, Vec<UploaderImm>>;
71pub type UploadTaskPayload = HashMap<LocalInstanceId, Vec<ImmutableMemtable>>;
72
73#[derive(Debug)]
74pub struct UploadTaskOutput {
75 pub new_value_ssts: Vec<LocalSstableInfo>,
76 pub old_value_ssts: Vec<LocalSstableInfo>,
77 pub wait_poll_timer: Option<HistogramTimer>,
78}
79pub type SpawnUploadTask = Arc<
80 dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle<HummockResult<UploadTaskOutput>>
81 + Send
82 + Sync
83 + 'static,
84>;
85
86#[derive(Clone)]
87pub struct UploadTaskInfo {
88 pub task_size: usize,
89 pub epochs: Vec<HummockEpoch>,
90 pub table_ids: HashSet<TableId>,
91 pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
92}
93
94impl Display for UploadTaskInfo {
95 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct("UploadTaskInfo")
97 .field("task_size", &self.task_size)
98 .field("epochs", &self.epochs)
99 .field("len(imm_ids)", &self.imm_ids.len())
100 .finish()
101 }
102}
103
104impl Debug for UploadTaskInfo {
105 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("UploadTaskInfo")
107 .field("task_size", &self.task_size)
108 .field("epochs", &self.epochs)
109 .field("imm_ids", &self.imm_ids)
110 .finish()
111 }
112}
113
114mod uploader_imm {
115 use std::fmt::Formatter;
116 use std::ops::Deref;
117
118 use risingwave_common::metrics::{LabelGuardedIntGauge, UintGauge};
119
120 #[cfg(test)]
121 use crate::hummock::MemoryLimiter;
122 use crate::hummock::event_handler::uploader::UploaderContext;
123 use crate::hummock::utils::MemoryTracker;
124 use crate::mem_table::ImmutableMemtable;
125
126 pub(super) struct UploaderImm {
127 inner: ImmutableMemtable,
128 size_guard: UintGauge,
129 per_table_size_guard: LabelGuardedIntGauge,
130 per_table_count_guard: LabelGuardedIntGauge,
131 _tracker: MemoryTracker,
132 }
133
134 impl UploaderImm {
135 pub(super) fn new(
136 imm: ImmutableMemtable,
137 context: &UploaderContext,
138 tracker: MemoryTracker,
139 ) -> Self {
140 let size = imm.size();
141 let table_id_str = imm.table_id().to_string();
142 let size_guard = context.stats.uploader_imm_size.clone();
143 let per_table_size_guard = context
144 .stats
145 .uploader_per_table_imm_size
146 .with_guarded_label_values(&[&table_id_str]);
147 let per_table_count_guard = context
148 .stats
149 .uploader_per_table_imm_count
150 .with_guarded_label_values(&[&table_id_str]);
151 size_guard.add(size as _);
152 per_table_size_guard.add(size as _);
153 per_table_count_guard.add(1 as _);
154 Self {
155 inner: imm,
156 size_guard,
157 per_table_size_guard,
158 per_table_count_guard,
159 _tracker: tracker,
160 }
161 }
162
163 #[cfg(test)]
164 pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
165 Self {
166 inner: imm,
167 size_guard: UintGauge::new("test", "test").unwrap(),
168 per_table_size_guard: LabelGuardedIntGauge::test_int_gauge::<1>(),
169 per_table_count_guard: LabelGuardedIntGauge::test_int_gauge::<1>(),
170 _tracker: MemoryLimiter::unlimit().try_require_memory(1).unwrap(),
171 }
172 }
173 }
174
175 impl std::fmt::Debug for UploaderImm {
176 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
177 self.inner.fmt(f)
178 }
179 }
180
181 impl Deref for UploaderImm {
182 type Target = ImmutableMemtable;
183
184 fn deref(&self) -> &Self::Target {
185 &self.inner
186 }
187 }
188
189 impl Drop for UploaderImm {
190 fn drop(&mut self) {
191 self.size_guard.sub(self.inner.size() as _);
192 self.per_table_size_guard.sub(self.inner.size() as _);
193 self.per_table_count_guard.dec();
194 }
195 }
196}
197
198#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
199struct UploadingTaskId(usize);
200
201struct UploadingTask {
204 task_id: UploadingTaskId,
205 input: UploadTaskInput,
207 join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
208 task_info: UploadTaskInfo,
209 spawn_upload_task: SpawnUploadTask,
210 task_size_guard: UintGauge,
211 task_count_guard: IntGauge,
212}
213
214impl Drop for UploadingTask {
215 fn drop(&mut self) {
216 self.task_size_guard.sub(self.task_info.task_size as u64);
217 self.task_count_guard.dec();
218 }
219}
220
221impl Debug for UploadingTask {
222 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
223 f.debug_struct("UploadingTask")
224 .field("input", &self.input)
225 .field("task_info", &self.task_info)
226 .finish()
227 }
228}
229
230fn get_payload_imm_ids(
231 payload: &UploadTaskPayload,
232) -> HashMap<LocalInstanceId, Vec<SharedBufferBatchId>> {
233 payload
234 .iter()
235 .map(|(instance_id, imms)| {
236 (
237 *instance_id,
238 imms.iter().map(|imm| imm.batch_id()).collect_vec(),
239 )
240 })
241 .collect()
242}
243
244impl UploadingTask {
245 const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);
247
248 fn input_to_payload(input: &UploadTaskInput) -> UploadTaskPayload {
249 input
250 .iter()
251 .map(|(instance_id, imms)| {
252 (
253 *instance_id,
254 imms.iter().map(|imm| (**imm).clone()).collect(),
255 )
256 })
257 .collect()
258 }
259
260 fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
261 assert!(!input.is_empty());
262 let mut epochs = input
263 .iter()
264 .flat_map(|(_, imms)| imms.iter().map(|imm| imm.epoch()))
265 .sorted()
266 .dedup()
267 .collect_vec();
268
269 epochs.reverse();
271 let payload = Self::input_to_payload(&input);
272 let imm_ids = get_payload_imm_ids(&payload);
273 let task_size = input
274 .values()
275 .map(|imms| imms.iter().map(|imm| imm.size()).sum::<usize>())
276 .sum();
277 let task_info = UploadTaskInfo {
278 task_size,
279 epochs,
280 table_ids: payload
281 .values()
282 .flat_map(|imms| imms.iter().map(|imm| imm.table_id()))
283 .collect(),
284 imm_ids,
285 };
286 context
287 .buffer_tracker
288 .global_upload_task_size()
289 .add(task_size as u64);
290 if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
291 info!("start upload task: {:?}", task_info);
292 } else {
293 debug!("start upload task: {:?}", task_info);
294 }
295 let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
296 context.stats.uploader_uploading_task_count.inc();
297 Self {
298 task_id,
299 input,
300 join_handle,
301 task_info,
302 spawn_upload_task: context.spawn_upload_task.clone(),
303 task_size_guard: context.buffer_tracker.global_upload_task_size().clone(),
304 task_count_guard: context.stats.uploader_uploading_task_count.clone(),
305 }
306 }
307
308 fn poll_result(
310 &mut self,
311 cx: &mut Context<'_>,
312 ) -> Poll<HummockResult<Arc<StagingSstableInfo>>> {
313 Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
314 Ok(task_result) => task_result
315 .inspect(|_| {
316 if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
317 info!(task_info = ?self.task_info, "upload task finish");
318 } else {
319 debug!(task_info = ?self.task_info, "upload task finish");
320 }
321 })
322 .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
323 .map(|output| {
324 Arc::new(StagingSstableInfo::new(
325 output.new_value_ssts,
326 output.old_value_ssts,
327 self.task_info.epochs.clone(),
328 self.task_info.imm_ids.clone(),
329 self.task_info.task_size,
330 ))
331 }),
332
333 Err(err) => Err(HummockError::other(format!(
334 "fail to join upload join handle: {}",
335 err.as_report()
336 ))),
337 })
338 }
339
340 fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll<Arc<StagingSstableInfo>> {
342 loop {
343 let result = ready!(self.poll_result(cx));
344 match result {
345 Ok(sstables) => return Poll::Ready(sstables),
346 Err(e) => {
347 error!(
348 error = %e.as_report(),
349 task_info = ?self.task_info,
350 "a flush task failed, start retry",
351 );
352 self.join_handle = (self.spawn_upload_task)(
353 Self::input_to_payload(&self.input),
354 self.task_info.clone(),
355 );
356 }
360 }
361 }
362 }
363
364 pub fn get_task_info(&self) -> &UploadTaskInfo {
365 &self.task_info
366 }
367}
368
369impl TableUnsyncData {
370 fn add_table_watermarks(
371 &mut self,
372 epoch: HummockEpoch,
373 table_watermarks: Vec<VnodeWatermark>,
374 direction: WatermarkDirection,
375 watermark_type: WatermarkSerdeType,
376 ) {
377 if table_watermarks.is_empty() {
378 return;
379 }
380 let vnode_count = table_watermarks[0].vnode_count();
381 for watermark in &table_watermarks {
382 assert_eq!(vnode_count, watermark.vnode_count());
383 }
384
385 fn apply_new_vnodes(
386 vnode_bitmap: &mut BitmapBuilder,
387 vnode_watermarks: &Vec<VnodeWatermark>,
388 ) {
389 for vnode_watermark in vnode_watermarks {
390 for vnode in vnode_watermark.vnode_bitmap().iter_ones() {
391 assert!(
392 !vnode_bitmap.is_set(vnode),
393 "vnode {} write multiple table watermarks",
394 vnode
395 );
396 vnode_bitmap.set(vnode, true);
397 }
398 }
399 }
400 match &mut self.table_watermarks {
401 Some((prev_direction, prev_watermarks, prev_watermark_type)) => {
402 assert_eq!(
403 *prev_direction, direction,
404 "table id {} new watermark direction not match with previous",
405 self.table_id
406 );
407 assert_eq!(
408 *prev_watermark_type, watermark_type,
409 "table id {} new watermark watermark_type not match with previous",
410 self.table_id
411 );
412 match prev_watermarks.entry(epoch) {
413 Entry::Occupied(mut entry) => {
414 let (prev_watermarks, vnode_bitmap) = entry.get_mut();
415 apply_new_vnodes(vnode_bitmap, &table_watermarks);
416 prev_watermarks.extend(table_watermarks);
417 }
418 Entry::Vacant(entry) => {
419 let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
420 apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
421 entry.insert((table_watermarks, vnode_bitmap));
422 }
423 }
424 }
425 None => {
426 let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
427 apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
428 self.table_watermarks = Some((
429 direction,
430 BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]),
431 watermark_type,
432 ));
433 }
434 }
435 }
436}
437
438impl UploaderData {
439 fn add_table_watermarks(
440 all_table_watermarks: &mut HashMap<TableId, TableWatermarks>,
441 table_id: TableId,
442 direction: WatermarkDirection,
443 watermarks: impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)>,
444 watermark_type: WatermarkSerdeType,
445 ) {
446 let mut table_watermarks: Option<TableWatermarks> = None;
447 for (epoch, watermarks) in watermarks {
448 match &mut table_watermarks {
449 Some(prev_watermarks) => {
450 assert_eq!(prev_watermarks.direction, direction);
451 assert_eq!(prev_watermarks.watermark_type, watermark_type);
452 prev_watermarks.add_new_epoch_watermarks(
453 epoch,
454 Arc::from(watermarks),
455 direction,
456 watermark_type,
457 );
458 }
459 None => {
460 table_watermarks = Some(TableWatermarks::single_epoch(
461 epoch,
462 watermarks,
463 direction,
464 watermark_type,
465 ));
466 }
467 }
468 }
469 if let Some(table_watermarks) = table_watermarks {
470 assert!(
471 all_table_watermarks
472 .insert(table_id, table_watermarks)
473 .is_none()
474 );
475 }
476 }
477}
478
479struct LocalInstanceEpochData {
480 epoch: HummockEpoch,
481 imms: VecDeque<UploaderImm>,
483 has_spilled: bool,
484}
485
486impl LocalInstanceEpochData {
487 fn new(epoch: HummockEpoch) -> Self {
488 Self {
489 epoch,
490 imms: VecDeque::new(),
491 has_spilled: false,
492 }
493 }
494
495 fn epoch(&self) -> HummockEpoch {
496 self.epoch
497 }
498
499 fn add_imm(&mut self, imm: UploaderImm) {
500 assert_eq!(self.epoch, imm.epoch());
501 if let Some(prev_imm) = self.imms.front() {
502 assert_gt!(imm.batch_id(), prev_imm.batch_id());
503 }
504 self.imms.push_front(imm);
505 }
506
507 fn is_empty(&self) -> bool {
508 self.imms.is_empty()
509 }
510}
511
512struct LocalInstanceUnsyncData {
513 table_id: TableId,
514 instance_id: LocalInstanceId,
515 current_epoch_data: Option<LocalInstanceEpochData>,
517 sealed_data: VecDeque<LocalInstanceEpochData>,
519 flushing_imms: VecDeque<SharedBufferBatchId>,
521 is_destroyed: bool,
522 is_stopped: bool,
523}
524
525impl LocalInstanceUnsyncData {
526 fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self {
527 Self {
528 table_id,
529 instance_id,
530 current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
531 sealed_data: VecDeque::new(),
532 flushing_imms: Default::default(),
533 is_destroyed: false,
534 is_stopped: false,
535 }
536 }
537
538 fn add_imm(&mut self, imm: UploaderImm) {
539 assert!(!self.is_destroyed);
540 assert!(!self.is_stopped);
541 assert_eq!(self.table_id, imm.table_id);
542 self.current_epoch_data
543 .as_mut()
544 .expect("should be Some when adding new imm")
545 .add_imm(imm);
546 }
547
548 fn local_seal_epoch(&mut self, next_epoch: HummockEpoch, stopped: bool) -> HummockEpoch {
549 let data = self
550 .current_epoch_data
551 .as_mut()
552 .expect("should be Some when seal new epoch");
553 let current_epoch = data.epoch;
554 debug!(
555 instance_id = self.instance_id,
556 next_epoch, current_epoch, stopped, "local seal epoch"
557 );
558 assert_gt!(next_epoch, current_epoch);
559 assert!(!self.is_destroyed);
560 assert!(!self.is_stopped);
561 self.is_stopped = stopped;
562 let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch));
563 if !epoch_data.is_empty() {
564 self.sealed_data.push_front(epoch_data);
565 }
566 current_epoch
567 }
568
569 fn ack_flushed(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
571 for imm_id in imm_ids {
572 assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id);
573 }
574 }
575
576 fn spill(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
577 let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() {
578 match oldest_sealed_epoch.epoch.cmp(&epoch) {
579 Ordering::Less => {
580 unreachable!(
581 "should not spill at this epoch because there \
582 is unspilled data in previous epoch: prev epoch {}, spill epoch {}",
583 oldest_sealed_epoch.epoch, epoch
584 );
585 }
586 Ordering::Equal => {
587 let epoch_data = self.sealed_data.pop_back().unwrap();
588 assert_eq!(epoch, epoch_data.epoch);
589 epoch_data.imms
590 }
591 Ordering::Greater => VecDeque::new(),
592 }
593 } else {
594 let Some(current_epoch_data) = &mut self.current_epoch_data else {
595 return Vec::new();
596 };
597 match current_epoch_data.epoch.cmp(&epoch) {
598 Ordering::Less => {
599 assert!(
600 current_epoch_data.imms.is_empty(),
601 "should not spill at this epoch because there \
602 is unspilled data in current epoch epoch {}, spill epoch {}",
603 current_epoch_data.epoch,
604 epoch
605 );
606 VecDeque::new()
607 }
608 Ordering::Equal => {
609 if !current_epoch_data.imms.is_empty() {
610 current_epoch_data.has_spilled = true;
611 take(&mut current_epoch_data.imms)
612 } else {
613 VecDeque::new()
614 }
615 }
616 Ordering::Greater => VecDeque::new(),
617 }
618 };
619 self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
620 imms.into_iter().collect()
621 }
622
623 fn add_flushing_imm(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
624 for imm_id in imm_ids {
625 if let Some(prev_imm_id) = self.flushing_imms.front() {
626 assert_gt!(imm_id, *prev_imm_id);
627 }
628 self.flushing_imms.push_front(imm_id);
629 }
630 }
631
632 fn sync(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
635 let mut ret = Vec::new();
637 while let Some(epoch_data) = self.sealed_data.back()
638 && epoch_data.epoch() <= epoch
639 {
640 let imms = self.sealed_data.pop_back().expect("checked exist").imms;
641 self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
642 ret.extend(imms.into_iter().rev());
643 }
644 ret.reverse();
646 if let Some(latest_epoch_data) = &self.current_epoch_data
647 && latest_epoch_data.epoch <= epoch
648 {
649 assert!(self.sealed_data.is_empty());
650 assert!(latest_epoch_data.is_empty());
651 assert!(!latest_epoch_data.has_spilled);
652 if cfg!(debug_assertions) {
653 panic!(
654 "sync epoch exceeds latest epoch, and the current instance should have been archived, table_id = {}, latest_epoch_data = {}, epoch = {}",
655 self.table_id,
656 latest_epoch_data.epoch(),
657 epoch
658 );
659 }
660 warn!(
661 instance_id = self.instance_id,
662 table_id = %self.table_id,
663 "sync epoch exceeds latest epoch, and the current instance should have be archived"
664 );
665 self.current_epoch_data = None;
666 }
667 ret
668 }
669
670 fn assert_after_epoch(&self, epoch: HummockEpoch) {
671 if let Some(oldest_sealed_data) = self.sealed_data.back() {
672 assert!(!oldest_sealed_data.imms.is_empty());
673 assert_gt!(oldest_sealed_data.epoch, epoch);
674 } else if let Some(current_data) = &self.current_epoch_data
675 && current_data.epoch <= epoch
676 {
677 assert!(current_data.imms.is_empty() && !current_data.has_spilled);
678 }
679 }
680
681 fn is_finished(&self) -> bool {
682 self.is_destroyed && self.sealed_data.is_empty()
683 }
684}
685
686struct TableUnsyncData {
687 table_id: TableId,
688 instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
689 #[expect(clippy::type_complexity)]
690 table_watermarks: Option<(
691 WatermarkDirection,
692 BTreeMap<HummockEpoch, (Vec<VnodeWatermark>, BitmapBuilder)>,
693 WatermarkSerdeType,
694 )>,
695 spill_tasks: BTreeMap<HummockEpoch, VecDeque<UploadingTaskId>>,
696 unsync_epochs: BTreeMap<HummockEpoch, ()>,
697 syncing_epochs: VecDeque<HummockEpoch>,
699 max_synced_epoch: Option<HummockEpoch>,
700}
701
702impl TableUnsyncData {
703 fn new(table_id: TableId, committed_epoch: Option<HummockEpoch>) -> Self {
704 Self {
705 table_id,
706 instance_data: Default::default(),
707 table_watermarks: None,
708 spill_tasks: Default::default(),
709 unsync_epochs: Default::default(),
710 syncing_epochs: Default::default(),
711 max_synced_epoch: committed_epoch,
712 }
713 }
714
715 fn new_epoch(&mut self, epoch: HummockEpoch) {
716 debug!(table_id = ?self.table_id, epoch, "table new epoch");
717 if let Some(latest_epoch) = self.max_epoch() {
718 assert_gt!(epoch, latest_epoch);
719 }
720 self.unsync_epochs.insert(epoch, ());
721 }
722
723 #[expect(clippy::type_complexity)]
724 fn sync(
725 &mut self,
726 epoch: HummockEpoch,
727 ) -> (
728 impl Iterator<Item = (LocalInstanceId, Vec<UploaderImm>)> + '_,
729 Option<(
730 WatermarkDirection,
731 impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)> + use<>,
732 WatermarkSerdeType,
733 )>,
734 impl Iterator<Item = UploadingTaskId> + use<>,
735 BTreeMap<HummockEpoch, ()>,
736 ) {
737 if let Some(prev_epoch) = self.max_sync_epoch() {
738 assert_gt!(epoch, prev_epoch)
739 }
740 let epochs = take_before_epoch(&mut self.unsync_epochs, epoch);
741 assert_eq!(
742 *epochs.last_key_value().expect("non-empty").0,
743 epoch,
744 "{epochs:?} {epoch} {:?}",
745 self.table_id
746 );
747 self.syncing_epochs.push_front(epoch);
748 (
749 self.instance_data
750 .iter_mut()
751 .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))),
752 self.table_watermarks
753 .as_mut()
754 .map(|(direction, watermarks, watermark_type)| {
755 let watermarks = take_before_epoch(watermarks, epoch)
756 .into_iter()
757 .map(|(epoch, (watermarks, _))| (epoch, watermarks));
758 (*direction, watermarks, *watermark_type)
759 }),
760 take_before_epoch(&mut self.spill_tasks, epoch)
761 .into_values()
762 .flat_map(|tasks| tasks.into_iter()),
763 epochs,
764 )
765 }
766
767 fn ack_synced(&mut self, sync_epoch: HummockEpoch) {
768 let min_sync_epoch = self.syncing_epochs.pop_back().expect("should exist");
769 assert_eq!(sync_epoch, min_sync_epoch);
770 self.max_synced_epoch = Some(sync_epoch);
771 }
772
773 fn ack_committed(&mut self, committed_epoch: HummockEpoch) {
774 let synced_epoch_advanced = {
775 if let Some(max_synced_epoch) = self.max_synced_epoch
776 && max_synced_epoch >= committed_epoch
777 {
778 false
779 } else {
780 true
781 }
782 };
783 if synced_epoch_advanced {
784 self.max_synced_epoch = Some(committed_epoch);
785 if let Some(min_syncing_epoch) = self.syncing_epochs.back() {
786 assert_gt!(*min_syncing_epoch, committed_epoch);
787 }
788 self.assert_after_epoch(committed_epoch);
789 }
790 }
791
792 fn assert_after_epoch(&self, epoch: HummockEpoch) {
793 self.instance_data
794 .values()
795 .for_each(|instance_data| instance_data.assert_after_epoch(epoch));
796 if let Some((_, watermarks, _)) = &self.table_watermarks
797 && let Some((oldest_epoch, _)) = watermarks.first_key_value()
798 {
799 assert_gt!(*oldest_epoch, epoch);
800 }
801 }
802
803 fn max_sync_epoch(&self) -> Option<HummockEpoch> {
804 self.syncing_epochs
805 .front()
806 .cloned()
807 .or(self.max_synced_epoch)
808 }
809
810 fn max_epoch(&self) -> Option<HummockEpoch> {
811 self.unsync_epochs
812 .last_key_value()
813 .map(|(epoch, _)| *epoch)
814 .or_else(|| self.max_sync_epoch())
815 }
816
817 fn is_empty(&self) -> bool {
818 self.instance_data.is_empty()
819 && self.syncing_epochs.is_empty()
820 && self.unsync_epochs.is_empty()
821 }
822}
823
824#[derive(Eq, Hash, PartialEq, Copy, Clone)]
825struct UnsyncEpochId(HummockEpoch, TableId);
826
827impl UnsyncEpochId {
828 fn epoch(&self) -> HummockEpoch {
829 self.0
830 }
831}
832
833fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet<TableId>) -> Option<UnsyncEpochId> {
834 table_ids
835 .iter()
836 .min()
837 .map(|table_id| UnsyncEpochId(epoch, *table_id))
838}
839
840#[derive(Default)]
841struct UnsyncData {
846 table_data: HashMap<TableId, TableUnsyncData>,
847 instance_table_id: HashMap<LocalInstanceId, TableId>,
849 unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
850 spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
851}
852
853impl UnsyncData {
854 fn init_instance(
855 &mut self,
856 table_id: TableId,
857 instance_id: LocalInstanceId,
858 init_epoch: HummockEpoch,
859 ) {
860 debug!(
861 table_id = %table_id,
862 instance_id, init_epoch, "init epoch"
863 );
864 let table_data = self
865 .table_data
866 .get_mut(&table_id)
867 .unwrap_or_else(|| panic!("should exist. {table_id:?}"));
868 assert!(
869 table_data
870 .instance_data
871 .insert(
872 instance_id,
873 LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch)
874 )
875 .is_none()
876 );
877 assert!(
878 self.instance_table_id
879 .insert(instance_id, table_id)
880 .is_none()
881 );
882 assert!(table_data.unsync_epochs.contains_key(&init_epoch));
883 }
884
885 fn instance_data(
886 &mut self,
887 instance_id: LocalInstanceId,
888 ) -> Option<&mut LocalInstanceUnsyncData> {
889 self.instance_table_id
890 .get_mut(&instance_id)
891 .cloned()
892 .map(move |table_id| {
893 self.table_data
894 .get_mut(&table_id)
895 .expect("should exist")
896 .instance_data
897 .get_mut(&instance_id)
898 .expect("should exist")
899 })
900 }
901
902 fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) {
903 self.instance_data(instance_id)
904 .expect("should exist")
905 .add_imm(imm);
906 }
907
908 fn local_seal_epoch(
909 &mut self,
910 instance_id: LocalInstanceId,
911 next_epoch: HummockEpoch,
912 opts: SealCurrentEpochOptions,
913 ) {
914 let table_id = self.instance_table_id[&instance_id];
915 let table_data = self.table_data.get_mut(&table_id).expect("should exist");
916 let instance_data = table_data
917 .instance_data
918 .get_mut(&instance_id)
919 .expect("should exist");
920 let stopped = !table_data.unsync_epochs.contains_key(&next_epoch);
924 let epoch = instance_data.local_seal_epoch(next_epoch, stopped);
925 if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks {
926 table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type);
927 }
928 }
929
930 fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
931 if let Some(table_id) = self.instance_table_id.get(&instance_id) {
932 debug!(instance_id, "destroy instance");
933 let table_data = self.table_data.get_mut(table_id).expect("should exist");
934 let instance_data = table_data
935 .instance_data
936 .get_mut(&instance_id)
937 .expect("should exist");
938 assert!(
939 !instance_data.is_destroyed,
940 "cannot destroy an instance for twice"
941 );
942 instance_data.is_destroyed = true;
943 }
944 }
945
946 fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
947 for table_id in table_ids {
948 if let Some(table_unsync_data) = self.table_data.remove(table_id) {
949 for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
950 if let Some(task_status) = task_manager.abort_task(task_id) {
951 must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
952 assert!(spill_table_ids.is_subset(table_ids));
953 });
954 }
955 if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
956 assert!(spill_table_ids.is_subset(table_ids));
957 }
958 }
959 assert!(
960 table_unsync_data
961 .instance_data
962 .values()
963 .all(|instance| instance.is_destroyed),
964 "should be clear when dropping the read version instance"
965 );
966 for instance_id in table_unsync_data.instance_data.keys() {
967 assert_eq!(
968 *table_id,
969 self.instance_table_id
970 .remove(instance_id)
971 .expect("should exist")
972 );
973 }
974 }
975 }
976 debug_assert!(
977 self.spilled_data
978 .values()
979 .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))
980 );
981 self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
982 if !unsync_epoch_table_ids.is_disjoint(table_ids) {
983 assert!(unsync_epoch_table_ids.is_subset(table_ids));
984 false
985 } else {
986 true
987 }
988 });
989 assert!(
990 self.instance_table_id
991 .values()
992 .all(|table_id| !table_ids.contains(table_id))
993 );
994 }
995}
996
997impl UploaderData {
998 fn sync(
999 &mut self,
1000 context: &UploaderContext,
1001 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1002 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1003 ) {
1004 let mut all_table_watermarks = HashMap::new();
1005 let mut uploading_tasks = HashSet::new();
1006 let mut spilled_tasks = BTreeSet::new();
1007 let mut all_table_ids = HashSet::new();
1008 let mut vector_index_adds = HashMap::new();
1009
1010 let mut flush_payload = HashMap::new();
1011
1012 for (epoch, table_ids) in &sync_table_epochs {
1013 let epoch = *epoch;
1014 for table_id in table_ids {
1015 assert!(
1016 all_table_ids.insert(*table_id),
1017 "duplicate sync table epoch: {:?} {:?}",
1018 all_table_ids,
1019 sync_table_epochs
1020 );
1021 }
1022 if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) {
1023 let min_table_id_data = self
1024 .unsync_data
1025 .table_data
1026 .get_mut(&min_table_id)
1027 .expect("should exist");
1028 let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch);
1029 for epoch in epochs.keys() {
1030 assert_eq!(
1031 &self
1032 .unsync_data
1033 .unsync_epochs
1034 .remove(&UnsyncEpochId(*epoch, min_table_id))
1035 .expect("should exist"),
1036 table_ids
1037 );
1038 }
1039 for table_id in table_ids {
1040 let table_data = self
1041 .unsync_data
1042 .table_data
1043 .get_mut(table_id)
1044 .expect("should exist");
1045 let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) =
1046 table_data.sync(epoch);
1047 assert_eq!(table_unsync_epochs, epochs, "invalid table_id {table_id}");
1048 for (instance_id, payload) in unflushed_payload {
1049 if !payload.is_empty() {
1050 flush_payload.insert(instance_id, payload);
1051 }
1052 }
1053 table_data.instance_data.retain(|instance_id, data| {
1054 if data.is_finished() {
1056 assert_eq!(
1057 self.unsync_data.instance_table_id.remove(instance_id),
1058 Some(*table_id)
1059 );
1060 false
1061 } else {
1062 true
1063 }
1064 });
1065 if let Some((direction, watermarks, watermark_type)) = table_watermarks {
1066 Self::add_table_watermarks(
1067 &mut all_table_watermarks,
1068 *table_id,
1069 direction,
1070 watermarks,
1071 watermark_type,
1072 );
1073 }
1074 for task_id in task_ids {
1075 if self.unsync_data.spilled_data.contains_key(&task_id) {
1076 spilled_tasks.insert(task_id);
1077 } else {
1078 uploading_tasks.insert(task_id);
1079 }
1080 }
1081
1082 if let hash_map::Entry::Occupied(mut entry) =
1083 self.unsync_vector_index_data.entry(*table_id)
1084 {
1085 let data = entry.get_mut();
1086 let adds = take_before_epoch(&mut data.sealed_epoch_data, epoch)
1087 .into_values()
1088 .flatten()
1089 .collect_vec();
1090 if data.is_dropped && data.sealed_epoch_data.is_empty() {
1091 entry.remove();
1092 }
1093 if !adds.is_empty() {
1094 vector_index_adds
1095 .try_insert(*table_id, adds)
1096 .expect("non-duplicate");
1097 }
1098 }
1099 }
1100 }
1101 }
1102
1103 let sync_id = {
1104 let sync_id = self.next_sync_id;
1105 self.next_sync_id += 1;
1106 SyncId(sync_id)
1107 };
1108
1109 if let Some(extra_flush_task_id) = self.task_manager.sync(
1110 context,
1111 sync_id,
1112 flush_payload,
1113 uploading_tasks.iter().cloned(),
1114 &all_table_ids,
1115 ) {
1116 uploading_tasks.insert(extra_flush_task_id);
1117 }
1118
1119 let uploaded = spilled_tasks
1121 .iter()
1122 .rev()
1123 .map(|task_id| {
1124 let (sst, spill_table_ids) = self
1125 .unsync_data
1126 .spilled_data
1127 .remove(task_id)
1128 .expect("should exist");
1129 assert!(
1130 spill_table_ids.is_subset(&all_table_ids),
1131 "spilled tabled ids {:?} not a subset of sync table id {:?}",
1132 spill_table_ids,
1133 all_table_ids
1134 );
1135 sst
1136 })
1137 .collect();
1138
1139 self.syncing_data.insert(
1140 sync_id,
1141 SyncingData {
1142 sync_table_epochs,
1143 remaining_uploading_tasks: uploading_tasks,
1144 uploaded,
1145 table_watermarks: all_table_watermarks,
1146 vector_index_adds,
1147 sync_result_sender,
1148 },
1149 );
1150
1151 self.check_upload_task_consistency();
1152 }
1153}
1154
1155impl UnsyncData {
1156 fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
1157 for (instance_id, imm_ids) in sstable_info.imm_ids() {
1158 if let Some(instance_data) = self.instance_data(*instance_id) {
1159 instance_data.ack_flushed(imm_ids.iter().rev().cloned());
1161 }
1162 }
1163 }
1164}
1165
1166struct SyncingData {
1167 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1168 remaining_uploading_tasks: HashSet<UploadingTaskId>,
1169 uploaded: VecDeque<Arc<StagingSstableInfo>>,
1171 table_watermarks: HashMap<TableId, TableWatermarks>,
1172 vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1173 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1174}
1175
1176#[derive(Debug)]
1177pub struct SyncedData {
1178 pub uploaded_ssts: VecDeque<Arc<StagingSstableInfo>>,
1179 pub table_watermarks: HashMap<TableId, TableWatermarks>,
1180 pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1181}
1182
1183struct UploaderContext {
1184 pinned_version: PinnedVersion,
1185 spawn_upload_task: SpawnUploadTask,
1187 buffer_tracker: BufferTracker,
1188
1189 stats: Arc<HummockStateStoreMetrics>,
1190}
1191
1192impl UploaderContext {
1193 fn new(
1194 pinned_version: PinnedVersion,
1195 spawn_upload_task: SpawnUploadTask,
1196 buffer_tracker: BufferTracker,
1197 stats: Arc<HummockStateStoreMetrics>,
1198 ) -> Self {
1199 UploaderContext {
1200 pinned_version,
1201 spawn_upload_task,
1202 buffer_tracker,
1203 stats,
1204 }
1205 }
1206}
1207
1208#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
1209struct SyncId(usize);
1210
1211struct UnsyncVectorIndexData {
1212 sealed_epoch_data: BTreeMap<HummockEpoch, Option<VectorIndexAdd>>,
1213 curr_epoch: u64,
1214 is_dropped: bool,
1215}
1216
1217#[derive(Default)]
1218struct UploaderData {
1219 unsync_data: UnsyncData,
1220 unsync_vector_index_data: HashMap<TableId, UnsyncVectorIndexData>,
1221
1222 syncing_data: BTreeMap<SyncId, SyncingData>,
1223
1224 task_manager: TaskManager,
1225 next_sync_id: usize,
1226}
1227
1228impl UploaderData {
1229 fn abort(self, err: impl Fn() -> HummockError) {
1230 self.task_manager.abort_all_tasks();
1231 for syncing_data in self.syncing_data.into_values() {
1232 send_sync_result(syncing_data.sync_result_sender, Err(err()));
1233 }
1234 }
1235
1236 fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
1237 if table_ids.is_empty() {
1238 return;
1239 }
1240 self.unsync_data
1241 .clear_tables(&table_ids, &mut self.task_manager);
1242 self.syncing_data.retain(|sync_id, syncing_data| {
1243 if syncing_data
1244 .sync_table_epochs
1245 .iter()
1246 .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids))
1247 {
1248 assert!(
1249 syncing_data
1250 .sync_table_epochs
1251 .iter()
1252 .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))
1253 );
1254 for task_id in &syncing_data.remaining_uploading_tasks {
1255 match self
1256 .task_manager
1257 .abort_task(*task_id)
1258 .expect("should exist")
1259 {
1260 UploadingTaskStatus::Spilling(spill_table_ids) => {
1261 assert!(spill_table_ids.is_subset(&table_ids));
1262 }
1263 UploadingTaskStatus::Sync(task_sync_id) => {
1264 assert_eq!(sync_id, &task_sync_id);
1265 }
1266 }
1267 }
1268 false
1269 } else {
1270 true
1271 }
1272 });
1273
1274 self.check_upload_task_consistency();
1275 }
1276
1277 fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1278 self.unsync_data
1279 .spilled_data
1280 .values()
1281 .map(|(s, _)| s)
1282 .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
1283 .filter_map(|s| {
1284 s.sstable_infos()
1285 .iter()
1286 .chain(s.old_value_sstable_infos())
1287 .map(|s| s.sst_info.object_id)
1288 .min()
1289 })
1290 .min()
1291 .map(|object_id| object_id.as_raw())
1292 }
1293}
1294
1295struct ErrState {
1296 failed_sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1297 reason: String,
1298}
1299
1300enum UploaderState {
1301 Working(UploaderData),
1302 Err(ErrState),
1303}
1304
1305pub struct HummockUploader {
1318 state: UploaderState,
1319
1320 context: UploaderContext,
1321}
1322
1323impl HummockUploader {
1324 pub(super) fn new(
1325 state_store_metrics: Arc<HummockStateStoreMetrics>,
1326 pinned_version: PinnedVersion,
1327 spawn_upload_task: SpawnUploadTask,
1328 buffer_tracker: BufferTracker,
1329 ) -> Self {
1330 Self {
1331 state: UploaderState::Working(UploaderData::default()),
1332 context: UploaderContext::new(
1333 pinned_version,
1334 spawn_upload_task,
1335 buffer_tracker,
1336 state_store_metrics,
1337 ),
1338 }
1339 }
1340
1341 pub(super) fn buffer_tracker(&self) -> &BufferTracker {
1342 &self.context.buffer_tracker
1343 }
1344
1345 pub(super) fn hummock_version(&self) -> &PinnedVersion {
1346 &self.context.pinned_version
1347 }
1348
1349 pub(super) fn add_imms(
1350 &mut self,
1351 instance_id: LocalInstanceId,
1352 imms: Vec<(ImmutableMemtable, MemoryTracker)>,
1353 ) {
1354 let UploaderState::Working(data) = &mut self.state else {
1355 return;
1356 };
1357 for (imm, tracker) in imms {
1358 let imm = UploaderImm::new(imm, &self.context, tracker);
1359 data.unsync_data.add_imm(instance_id, imm);
1360 }
1361 }
1362
1363 pub(super) fn init_instance(
1364 &mut self,
1365 instance_id: LocalInstanceId,
1366 table_id: TableId,
1367 init_epoch: HummockEpoch,
1368 ) {
1369 let UploaderState::Working(data) = &mut self.state else {
1370 return;
1371 };
1372 data.unsync_data
1373 .init_instance(table_id, instance_id, init_epoch);
1374 }
1375
1376 pub(super) fn local_seal_epoch(
1377 &mut self,
1378 instance_id: LocalInstanceId,
1379 next_epoch: HummockEpoch,
1380 opts: SealCurrentEpochOptions,
1381 ) {
1382 let UploaderState::Working(data) = &mut self.state else {
1383 return;
1384 };
1385 data.unsync_data
1386 .local_seal_epoch(instance_id, next_epoch, opts);
1387 }
1388
1389 pub(super) fn register_vector_writer(&mut self, table_id: TableId, init_epoch: HummockEpoch) {
1390 let UploaderState::Working(data) = &mut self.state else {
1391 return;
1392 };
1393 assert!(
1394 data.unsync_vector_index_data
1395 .try_insert(
1396 table_id,
1397 UnsyncVectorIndexData {
1398 sealed_epoch_data: Default::default(),
1399 curr_epoch: init_epoch,
1400 is_dropped: false,
1401 }
1402 )
1403 .is_ok(),
1404 "duplicate vector writer on {}",
1405 table_id
1406 )
1407 }
1408
1409 pub(super) fn vector_writer_seal_epoch(
1410 &mut self,
1411 table_id: TableId,
1412 next_epoch: HummockEpoch,
1413 add: Option<VectorIndexAdd>,
1414 ) {
1415 let UploaderState::Working(data) = &mut self.state else {
1416 return;
1417 };
1418 let data = data
1419 .unsync_vector_index_data
1420 .get_mut(&table_id)
1421 .expect("should exist");
1422 assert!(!data.is_dropped);
1423 assert!(
1424 data.curr_epoch < next_epoch,
1425 "next epoch {} should be greater than current epoch {}",
1426 next_epoch,
1427 data.curr_epoch
1428 );
1429 data.sealed_epoch_data
1430 .try_insert(data.curr_epoch, add)
1431 .expect("non-duplicate");
1432 data.curr_epoch = next_epoch;
1433 }
1434
1435 pub(super) fn drop_vector_writer(&mut self, table_id: TableId) {
1436 let UploaderState::Working(data) = &mut self.state else {
1437 return;
1438 };
1439 let hash_map::Entry::Occupied(mut entry) = data.unsync_vector_index_data.entry(table_id)
1440 else {
1441 panic!("vector writer {} should exist", table_id);
1442 };
1443 let data = entry.get_mut();
1444 data.is_dropped = true;
1445 if data.sealed_epoch_data.is_empty() {
1446 entry.remove();
1447 }
1448 }
1449
1450 pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
1451 let UploaderState::Working(data) = &mut self.state else {
1452 return;
1453 };
1454 debug!(epoch, ?table_ids, "start epoch");
1455 for table_id in &table_ids {
1456 let table_data = data
1457 .unsync_data
1458 .table_data
1459 .entry(*table_id)
1460 .or_insert_with(|| {
1461 TableUnsyncData::new(
1462 *table_id,
1463 self.context.pinned_version.table_committed_epoch(*table_id),
1464 )
1465 });
1466 table_data.new_epoch(epoch);
1467 }
1468 if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) {
1469 assert!(
1470 data.unsync_data
1471 .unsync_epochs
1472 .insert(unsync_epoch_id, table_ids)
1473 .is_none()
1474 );
1475 }
1476 }
1477
1478 pub(super) fn start_sync_epoch(
1479 &mut self,
1480 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1481 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1482 ) {
1483 let data = match &mut self.state {
1484 UploaderState::Working(data) => data,
1485 UploaderState::Err(ErrState {
1486 failed_sync_table_epochs,
1487 reason,
1488 }) => {
1489 let result = Err(HummockError::other(format!(
1490 "previous sync epoch {:?} failed due to [{}]",
1491 failed_sync_table_epochs, reason
1492 )));
1493 send_sync_result(sync_result_sender, result);
1494 return;
1495 }
1496 };
1497 debug!(?sync_table_epochs, "start sync epoch");
1498
1499 data.sync(&self.context, sync_result_sender, sync_table_epochs);
1500
1501 data.may_notify_sync_task(&self.context);
1502
1503 self.context
1504 .stats
1505 .uploader_syncing_epoch_count
1506 .set(data.syncing_data.len() as _);
1507 }
1508
1509 pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) {
1510 if let UploaderState::Working(data) = &mut self.state {
1511 for (table_id, info) in pinned_version.state_table_info.info() {
1513 if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) {
1514 table_data.ack_committed(info.committed_epoch);
1515 }
1516 }
1517 }
1518 self.context.pinned_version = pinned_version;
1519 }
1520
1521 pub(crate) fn may_flush(&mut self, spiller_latency: &Histogram) -> bool {
1522 let UploaderState::Working(data) = &mut self.state else {
1523 return false;
1524 };
1525 if self.context.buffer_tracker.need_flush() {
1526 let timer = spiller_latency.start_timer();
1527 let mut spiller = Spiller::new(&mut data.unsync_data);
1528 let mut curr_batch_flush_size = 0;
1529 while self
1531 .context
1532 .buffer_tracker
1533 .need_more_flush(curr_batch_flush_size)
1534 && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload()
1535 {
1536 assert!(!payload.is_empty());
1537 {
1538 let (task_id, task_size, spilled_table_ids) =
1539 data.task_manager
1540 .spill(&self.context, spilled_table_ids, payload);
1541 for table_id in spilled_table_ids {
1542 spiller
1543 .unsync_data()
1544 .table_data
1545 .get_mut(table_id)
1546 .expect("should exist")
1547 .spill_tasks
1548 .entry(epoch)
1549 .or_default()
1550 .push_front(task_id);
1551 }
1552 curr_batch_flush_size += task_size;
1553 }
1554 }
1555 timer.observe_duration();
1556 data.check_upload_task_consistency();
1557 curr_batch_flush_size > 0
1558 } else {
1559 false
1560 }
1561 }
1562
1563 pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
1564 if let Some(table_ids) = table_ids {
1565 if let UploaderState::Working(data) = &mut self.state {
1566 data.clear_tables(table_ids);
1567 }
1568 } else {
1569 if let UploaderState::Working(data) = replace(
1570 &mut self.state,
1571 UploaderState::Working(UploaderData::default()),
1572 ) {
1573 data.abort(|| HummockError::other("uploader is reset"));
1574 }
1575
1576 self.context.stats.uploader_syncing_epoch_count.set(0);
1577 }
1578 }
1579
1580 pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
1581 let UploaderState::Working(data) = &mut self.state else {
1582 return;
1583 };
1584 data.unsync_data.may_destroy_instance(instance_id);
1585 }
1586
1587 pub(crate) fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1588 if let UploaderState::Working(ref u) = self.state {
1589 u.min_uncommitted_object_id()
1590 } else {
1591 None
1592 }
1593 }
1594}
1595
1596impl UploaderData {
1597 fn may_notify_sync_task(&mut self, context: &UploaderContext) {
1598 while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
1599 && syncing_data.remaining_uploading_tasks.is_empty()
1600 {
1601 let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty");
1602 let SyncingData {
1603 sync_table_epochs,
1604 remaining_uploading_tasks: _,
1605 uploaded,
1606 table_watermarks,
1607 vector_index_adds,
1608 sync_result_sender,
1609 } = syncing_data;
1610 context
1611 .stats
1612 .uploader_syncing_epoch_count
1613 .set(self.syncing_data.len() as _);
1614
1615 for (sync_epoch, table_ids) in sync_table_epochs {
1616 for table_id in table_ids {
1617 if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) {
1618 table_data.ack_synced(sync_epoch);
1619 if table_data.is_empty() {
1620 self.unsync_data.table_data.remove(&table_id);
1621 }
1622 }
1623 }
1624 }
1625
1626 send_sync_result(
1627 sync_result_sender,
1628 Ok(SyncedData {
1629 uploaded_ssts: uploaded,
1630 table_watermarks,
1631 vector_index_adds,
1632 }),
1633 )
1634 }
1635 }
1636
1637 fn check_upload_task_consistency(&self) {
1638 #[cfg(debug_assertions)]
1639 {
1640 let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new();
1641 for table_data in self.unsync_data.table_data.values() {
1642 for task_id in table_data
1643 .spill_tasks
1644 .iter()
1645 .flat_map(|(_, tasks)| tasks.iter())
1646 {
1647 assert!(
1648 spill_task_table_id_from_data
1649 .entry(*task_id)
1650 .or_default()
1651 .insert(table_data.table_id)
1652 );
1653 }
1654 }
1655 let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self
1656 .syncing_data
1657 .iter()
1658 .filter_map(|(sync_id, data)| {
1659 if data.remaining_uploading_tasks.is_empty() {
1660 None
1661 } else {
1662 Some((*sync_id, data.remaining_uploading_tasks.clone()))
1663 }
1664 })
1665 .collect();
1666
1667 let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1668 for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
1669 spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
1670 }
1671 let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1672 for (task_id, status) in self.task_manager.tasks() {
1673 match status {
1674 UploadingTaskStatus::Spilling(table_ids) => {
1675 assert!(
1676 spill_task_table_id_from_manager
1677 .insert(task_id, table_ids.clone())
1678 .is_none()
1679 );
1680 }
1681 UploadingTaskStatus::Sync(sync_id) => {
1682 assert!(
1683 syncing_task_from_manager
1684 .entry(*sync_id)
1685 .or_default()
1686 .insert(task_id)
1687 );
1688 }
1689 }
1690 }
1691 assert_eq!(
1692 spill_task_table_id_from_data,
1693 spill_task_table_id_from_manager
1694 );
1695 assert_eq!(syncing_task_id_from_data, syncing_task_from_manager);
1696 }
1697 }
1698}
1699
1700impl HummockUploader {
1701 pub(super) fn next_uploaded_ssts(
1702 &mut self,
1703 ) -> impl Future<Output = Vec<Arc<StagingSstableInfo>>> + '_ {
1704 poll_fn(|cx| {
1705 let UploaderState::Working(data) = &mut self.state else {
1706 return Poll::Pending;
1707 };
1708
1709 const MAX_BATCH_SIZE: usize = 16;
1710
1711 let mut ssts = None;
1712 while let Poll::Ready(Some((task_id, status, result))) =
1713 data.task_manager.poll_task_result(cx)
1714 {
1715 match result {
1716 Ok(sst) => {
1717 data.unsync_data.ack_flushed(&sst);
1718 match status {
1719 UploadingTaskStatus::Sync(sync_id) => {
1720 let syncing_data =
1721 data.syncing_data.get_mut(&sync_id).expect("should exist");
1722 syncing_data.uploaded.push_front(sst.clone());
1723 assert!(syncing_data.remaining_uploading_tasks.remove(&task_id));
1724 data.may_notify_sync_task(&self.context);
1725 }
1726 UploadingTaskStatus::Spilling(table_ids) => {
1727 data.unsync_data
1728 .spilled_data
1729 .insert(task_id, (sst.clone(), table_ids));
1730 }
1731 }
1732 data.check_upload_task_consistency();
1733 let ssts = ssts.get_or_insert_with(|| Vec::with_capacity(MAX_BATCH_SIZE));
1734 ssts.push(sst);
1735 if ssts.len() >= MAX_BATCH_SIZE {
1736 break;
1737 }
1738 }
1739 Err((sync_id, e)) => {
1740 let syncing_data =
1741 data.syncing_data.remove(&sync_id).expect("should exist");
1742 let failed_epochs = syncing_data.sync_table_epochs.clone();
1743 let data = must_match!(replace(
1744 &mut self.state,
1745 UploaderState::Err(ErrState {
1746 failed_sync_table_epochs: syncing_data.sync_table_epochs,
1747 reason: e.as_report().to_string(),
1748 }),
1749 ), UploaderState::Working(data) => data);
1750
1751 let _ = syncing_data
1752 .sync_result_sender
1753 .send(Err(HummockError::other(format!(
1754 "failed to sync: {:?}",
1755 e.as_report()
1756 ))));
1757
1758 data.abort(|| {
1759 HummockError::other(format!(
1760 "previous epoch {:?} failed to sync",
1761 failed_epochs
1762 ))
1763 });
1764 return Poll::Pending;
1765 }
1766 }
1767 }
1768 if let Some(ssts) = ssts {
1769 Poll::Ready(ssts)
1770 } else {
1771 Poll::Pending
1772 }
1773 })
1774 }
1775}
1776
1777#[cfg(test)]
1778pub(crate) mod tests {
1779 use std::collections::{HashMap, HashSet};
1780 use std::future::{Future, poll_fn};
1781 use std::pin::pin;
1782 use std::sync::Arc;
1783 use std::sync::atomic::AtomicUsize;
1784 use std::sync::atomic::Ordering::SeqCst;
1785 use std::task::Poll;
1786
1787 use futures::FutureExt;
1788 use risingwave_common::catalog::TableId;
1789 use risingwave_common::metrics::LabelGuardedHistogram;
1790 use risingwave_common::util::epoch::EpochExt;
1791 use risingwave_hummock_sdk::HummockEpoch;
1792 use risingwave_hummock_sdk::vector_index::{
1793 FlatIndexAdd, VectorFileInfo, VectorIndexAdd, VectorStoreInfoDelta,
1794 };
1795 use tokio::sync::oneshot;
1796
1797 use super::test_utils::*;
1798 use crate::hummock::event_handler::uploader::{
1799 HummockUploader, SyncedData, UploadingTask, get_payload_imm_ids,
1800 };
1801 use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID};
1802 use crate::hummock::utils::MemoryTracker;
1803 use crate::hummock::{HummockError, HummockResult};
1804 use crate::mem_table::ImmutableMemtable;
1805 use crate::opts::StorageOpts;
1806
1807 impl HummockUploader {
1808 pub(super) fn add_imm(
1809 &mut self,
1810 instance_id: LocalInstanceId,
1811 imm: (ImmutableMemtable, MemoryTracker),
1812 ) {
1813 self.add_imms(instance_id, vec![imm]);
1814 }
1815
1816 pub(super) fn start_single_epoch_sync(
1817 &mut self,
1818 epoch: HummockEpoch,
1819 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1820 table_ids: HashSet<TableId>,
1821 ) {
1822 self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]);
1823 }
1824
1825 pub(super) fn may_flush_for_test(&mut self) -> bool {
1826 self.may_flush(&LabelGuardedHistogram::test_histogram::<1>())
1827 }
1828 }
1829
1830 #[tokio::test]
1831 pub async fn test_uploading_task_future() {
1832 let uploader_context = test_uploader_context(dummy_success_upload_future);
1833
1834 let imm = gen_imm(INITIAL_EPOCH);
1835 let imm_size = imm.size();
1836 let imm_ids = get_imm_ids(vec![&imm]);
1837 let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1838 assert_eq!(imm_size, task.task_info.task_size);
1839 assert_eq!(imm_ids, task.task_info.imm_ids);
1840 assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs);
1841 let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1842 assert_eq!(
1843 output.sstable_infos(),
1844 &dummy_success_upload_output().new_value_ssts
1845 );
1846 assert_eq!(imm_size, output.imm_size());
1847 assert_eq!(&imm_ids, output.imm_ids());
1848 assert_eq!(&vec![INITIAL_EPOCH], output.epochs());
1849
1850 let uploader_context = test_uploader_context(dummy_fail_upload_future);
1851 let imm = gen_imm(INITIAL_EPOCH);
1852 let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1853 let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1854 }
1855
1856 #[tokio::test]
1857 pub async fn test_uploading_task_poll_result() {
1858 let uploader_context = test_uploader_context(dummy_success_upload_future);
1859 let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1860 let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1861 assert_eq!(
1862 output.sstable_infos(),
1863 &dummy_success_upload_output().new_value_ssts
1864 );
1865
1866 let uploader_context = test_uploader_context(dummy_fail_upload_future);
1867 let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1868 let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1869 }
1870
1871 #[tokio::test]
1872 async fn test_uploading_task_poll_ok_with_retry() {
1873 let run_count = Arc::new(AtomicUsize::new(0));
1874 let fail_num = 10;
1875 let run_count_clone = run_count.clone();
1876 let uploader_context = test_uploader_context(move |payload, info| {
1877 let run_count = run_count.clone();
1878 async move {
1879 let ret = if run_count.load(SeqCst) < fail_num {
1881 Err(HummockError::other("fail"))
1882 } else {
1883 dummy_success_upload_future(payload, info).await
1884 };
1885 run_count.fetch_add(1, SeqCst);
1886 ret
1887 }
1888 });
1889 let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1890 let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await;
1891 assert_eq!(fail_num + 1, run_count_clone.load(SeqCst));
1892 assert_eq!(
1893 output.sstable_infos(),
1894 &dummy_success_upload_output().new_value_ssts
1895 );
1896 }
1897
1898 #[tokio::test]
1899 async fn test_uploader_basic() {
1900 let mut uploader = test_uploader(dummy_success_upload_future);
1901 let mut sst_collector = UploadedSstCollector::default();
1902 let epoch1 = INITIAL_EPOCH.next_epoch();
1903 const VECTOR_INDEX_TABLE_ID: TableId = TableId::new(234);
1904 uploader.start_epoch(
1905 epoch1,
1906 HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1907 );
1908 let (imm, tracker) = gen_imm_with_unlimit(epoch1);
1909 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1910 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
1911 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1912
1913 uploader.register_vector_writer(VECTOR_INDEX_TABLE_ID, epoch1);
1914 let vector_info_file = VectorFileInfo {
1915 object_id: 1.into(),
1916 vector_count: 1,
1917 file_size: 0,
1918 start_vector_id: 0,
1919 meta_offset: 20,
1920 };
1921 let vector_index_add = VectorIndexAdd::Flat(FlatIndexAdd {
1922 vector_store_info_delta: VectorStoreInfoDelta {
1923 next_vector_id: 1,
1924 added_vector_files: vec![vector_info_file.clone()],
1925 },
1926 });
1927 uploader.vector_writer_seal_epoch(
1928 VECTOR_INDEX_TABLE_ID,
1929 epoch1.next_epoch(),
1930 Some(vector_index_add.clone()),
1931 );
1932
1933 let (sync_tx, sync_rx) = oneshot::channel();
1934 uploader.start_single_epoch_sync(
1935 epoch1,
1936 sync_tx,
1937 HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1938 );
1939 assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1940 assert_eq!(1, uploader.data().syncing_data.len());
1941 let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1942 assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1943 assert!(syncing_data.uploaded.is_empty());
1944 assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1945
1946 let staging_sst = sst_collector.next(&mut uploader).await;
1947 assert_eq!(&vec![epoch1], staging_sst.epochs());
1948 assert_eq!(
1949 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1950 staging_sst.imm_ids()
1951 );
1952 assert_eq!(
1953 &dummy_success_upload_output().new_value_ssts,
1954 staging_sst.sstable_infos()
1955 );
1956
1957 match sync_rx.await {
1958 Ok(Ok(data)) => {
1959 let SyncedData {
1960 uploaded_ssts,
1961 table_watermarks,
1962 vector_index_adds,
1963 } = data;
1964 assert_eq!(1, uploaded_ssts.len());
1965 let staging_sst = &uploaded_ssts[0];
1966 assert_eq!(&vec![epoch1], staging_sst.epochs());
1967 assert_eq!(
1968 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1969 staging_sst.imm_ids()
1970 );
1971 assert_eq!(
1972 &dummy_success_upload_output().new_value_ssts,
1973 staging_sst.sstable_infos()
1974 );
1975 assert!(table_watermarks.is_empty());
1976 assert_eq!(vector_index_adds.len(), 1);
1977 let (table_id, vector_index_adds) = vector_index_adds.into_iter().next().unwrap();
1978 assert_eq!(table_id, VECTOR_INDEX_TABLE_ID);
1979 assert_eq!(vector_index_adds.len(), 1);
1980 let synced_vector_index_add = vector_index_adds[0].clone();
1981 assert_eq!(vector_index_add, synced_vector_index_add);
1982 }
1983 _ => unreachable!(),
1984 };
1985 assert_eq!(epoch1, uploader.test_max_synced_epoch());
1986
1987 let new_pinned_version = uploader
1988 .context
1989 .pinned_version
1990 .new_pin_version(test_hummock_version(epoch1))
1991 .unwrap();
1992 uploader.update_pinned_version(new_pinned_version);
1993 assert_eq!(
1994 epoch1,
1995 uploader
1996 .context
1997 .pinned_version
1998 .table_committed_epoch(TEST_TABLE_ID)
1999 .unwrap()
2000 );
2001 }
2002
2003 #[tokio::test]
2004 async fn test_uploader_destroy_instance_before_sync() {
2005 let mut uploader = test_uploader(dummy_success_upload_future);
2006 let mut sst_collector = UploadedSstCollector::default();
2007 let epoch1 = INITIAL_EPOCH.next_epoch();
2008 uploader.start_epochs_for_test([epoch1]);
2009 let (imm, tracker) = gen_imm_with_unlimit(epoch1);
2010 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2011 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
2012 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2013 uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);
2014
2015 let (sync_tx, sync_rx) = oneshot::channel();
2016 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2017 assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
2018 assert_eq!(1, uploader.data().syncing_data.len());
2019 let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
2020 assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
2021 assert!(syncing_data.uploaded.is_empty());
2022 assert!(!syncing_data.remaining_uploading_tasks.is_empty());
2023
2024 let staging_sst = sst_collector.next(&mut uploader).await;
2025 assert_eq!(&vec![epoch1], staging_sst.epochs());
2026 assert_eq!(
2027 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2028 staging_sst.imm_ids()
2029 );
2030 assert_eq!(
2031 &dummy_success_upload_output().new_value_ssts,
2032 staging_sst.sstable_infos()
2033 );
2034
2035 match sync_rx.await {
2036 Ok(Ok(data)) => {
2037 let SyncedData {
2038 uploaded_ssts,
2039 table_watermarks,
2040 ..
2041 } = data;
2042 assert_eq!(1, uploaded_ssts.len());
2043 let staging_sst = &uploaded_ssts[0];
2044 assert_eq!(&vec![epoch1], staging_sst.epochs());
2045 assert_eq!(
2046 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2047 staging_sst.imm_ids()
2048 );
2049 assert_eq!(
2050 &dummy_success_upload_output().new_value_ssts,
2051 staging_sst.sstable_infos()
2052 );
2053 assert!(table_watermarks.is_empty());
2054 }
2055 _ => unreachable!(),
2056 };
2057 assert!(
2058 !uploader
2059 .data()
2060 .unsync_data
2061 .table_data
2062 .contains_key(&TEST_TABLE_ID)
2063 );
2064 }
2065
2066 #[tokio::test]
2067 async fn test_empty_uploader_sync() {
2068 let mut uploader = test_uploader(dummy_success_upload_future);
2069 let epoch1 = INITIAL_EPOCH.next_epoch();
2070
2071 let (sync_tx, sync_rx) = oneshot::channel();
2072 uploader.start_epochs_for_test([epoch1]);
2073 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2074 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2075 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2076 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2077
2078 assert_uploader_pending(&mut uploader).await;
2079
2080 match sync_rx.await {
2081 Ok(Ok(data)) => {
2082 assert!(data.uploaded_ssts.is_empty());
2083 }
2084 _ => unreachable!(),
2085 };
2086 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2087 let new_pinned_version = uploader
2088 .context
2089 .pinned_version
2090 .new_pin_version(test_hummock_version(epoch1))
2091 .unwrap();
2092 uploader.update_pinned_version(new_pinned_version);
2093 assert!(uploader.data().syncing_data.is_empty());
2094 assert_eq!(
2095 epoch1,
2096 uploader
2097 .context
2098 .pinned_version
2099 .table_committed_epoch(TEST_TABLE_ID)
2100 .unwrap()
2101 );
2102 }
2103
2104 #[tokio::test]
2105 async fn test_uploader_empty_epoch() {
2106 let mut uploader = test_uploader(dummy_success_upload_future);
2107 let epoch1 = INITIAL_EPOCH.next_epoch();
2108 let epoch2 = epoch1.next_epoch();
2109 uploader.start_epochs_for_test([epoch1, epoch2]);
2110 let (imm, tracker) = gen_imm_with_unlimit(epoch2);
2111 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2113 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2114 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm, tracker));
2115
2116 let (sync_tx, sync_rx) = oneshot::channel();
2117 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2118 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2119
2120 assert_uploader_pending(&mut uploader).await;
2121
2122 match sync_rx.await {
2123 Ok(Ok(data)) => {
2124 assert!(data.uploaded_ssts.is_empty());
2125 }
2126 _ => unreachable!(),
2127 };
2128 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2129 let new_pinned_version = uploader
2130 .context
2131 .pinned_version
2132 .new_pin_version(test_hummock_version(epoch1))
2133 .unwrap();
2134 uploader.update_pinned_version(new_pinned_version);
2135 assert!(uploader.data().syncing_data.is_empty());
2136 assert_eq!(
2137 epoch1,
2138 uploader
2139 .context
2140 .pinned_version
2141 .table_committed_epoch(TEST_TABLE_ID)
2142 .unwrap()
2143 );
2144 }
2145
2146 #[tokio::test]
2147 async fn test_uploader_poll_empty() {
2148 let mut uploader = test_uploader(dummy_success_upload_future);
2149 let fut = uploader.next_uploaded_ssts();
2150 let mut fut = pin!(fut);
2151 assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await);
2152 }
2153
2154 #[tokio::test]
2155 async fn test_uploader_empty_advance_mce() {
2156 let mut uploader = test_uploader(dummy_success_upload_future);
2157 let mut sst_collector = UploadedSstCollector::default();
2158 let initial_pinned_version = uploader.context.pinned_version.clone();
2159 let epoch1 = INITIAL_EPOCH.next_epoch();
2160 let epoch2 = epoch1.next_epoch();
2161 let epoch3 = epoch2.next_epoch();
2162 let epoch4 = epoch3.next_epoch();
2163 let epoch5 = epoch4.next_epoch();
2164 let epoch6 = epoch5.next_epoch();
2165 let version1 = initial_pinned_version
2166 .new_pin_version(test_hummock_version(epoch1))
2167 .unwrap();
2168 let version2 = initial_pinned_version
2169 .new_pin_version(test_hummock_version(epoch2))
2170 .unwrap();
2171 let version3 = initial_pinned_version
2172 .new_pin_version(test_hummock_version(epoch3))
2173 .unwrap();
2174 let version4 = initial_pinned_version
2175 .new_pin_version(test_hummock_version(epoch4))
2176 .unwrap();
2177 let version5 = initial_pinned_version
2178 .new_pin_version(test_hummock_version(epoch5))
2179 .unwrap();
2180
2181 uploader.start_epochs_for_test([epoch6]);
2182 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
2183
2184 uploader.update_pinned_version(version1);
2185 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2186 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2187
2188 let (imm, tracker) = gen_imm_with_unlimit(epoch6);
2189 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
2190 uploader.update_pinned_version(version2);
2191 assert_eq!(epoch2, uploader.test_max_synced_epoch());
2192 assert_eq!(epoch2, uploader.test_max_syncing_epoch());
2193
2194 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6);
2195 uploader.update_pinned_version(version3);
2196 assert_eq!(epoch3, uploader.test_max_synced_epoch());
2197 assert_eq!(epoch3, uploader.test_max_syncing_epoch());
2198
2199 let (sync_tx, sync_rx) = oneshot::channel();
2200 uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2201 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2202 uploader.update_pinned_version(version4);
2203 assert_eq!(epoch4, uploader.test_max_synced_epoch());
2204 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2205
2206 let sst = sst_collector.next(&mut uploader).await;
2207 assert_eq!(&get_imm_ids([&imm]), sst.imm_ids());
2208
2209 match sync_rx.await {
2210 Ok(Ok(data)) => {
2211 assert!(data.table_watermarks.is_empty());
2212 assert_eq!(1, data.uploaded_ssts.len());
2213 assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids());
2214 }
2215 _ => unreachable!(),
2216 }
2217
2218 uploader.update_pinned_version(version5);
2219 assert_eq!(epoch6, uploader.test_max_synced_epoch());
2220 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2221 }
2222
2223 #[tokio::test]
2224 async fn test_uploader_finish_in_order() {
2225 let config = StorageOpts {
2226 shared_buffer_capacity_mb: 1024 * 1024,
2227 shared_buffer_flush_ratio: 0.0,
2228 ..Default::default()
2229 };
2230 let (buffer_tracker, mut uploader, new_task_notifier) =
2231 prepare_uploader_order_test(&config, false);
2232 let mut sst_collector = UploadedSstCollector::default();
2233
2234 let epoch1 = INITIAL_EPOCH.next_epoch();
2235 let epoch2 = epoch1.next_epoch();
2236 let epoch3 = epoch2.next_epoch();
2237 let epoch4 = epoch3.next_epoch();
2238 uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]);
2239 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2240 let memory_limiter = memory_limiter.as_ref();
2241
2242 let instance_id1 = 1;
2243 let instance_id2 = 2;
2244
2245 uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2246 uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2247
2248 let (imm2, tracker2) = gen_imm_with_limiter(epoch2, memory_limiter).await;
2250 uploader.add_imm(instance_id2, (imm2.clone(), tracker2));
2251 let (imm1_1, tracker1_1) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2252 uploader.add_imm(instance_id1, (imm1_1.clone(), tracker1_1));
2253 let (imm1_2, tracker1_2) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2254 uploader.add_imm(instance_id1, (imm1_2.clone(), tracker1_2));
2255
2256 let epoch1_spill_payload12 =
2258 HashMap::from_iter([(instance_id1, vec![imm1_2.clone(), imm1_1.clone()])]);
2259 let epoch2_spill_payload = HashMap::from_iter([(instance_id2, vec![imm2.clone()])]);
2260 let (await_start1, finish_tx1) =
2261 new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload12));
2262 let (await_start2, finish_tx2) =
2263 new_task_notifier(get_payload_imm_ids(&epoch2_spill_payload));
2264 uploader.may_flush_for_test();
2265 await_start1.await;
2266 await_start2.await;
2267
2268 assert_uploader_pending(&mut uploader).await;
2269
2270 finish_tx2.send(()).unwrap();
2271 assert_uploader_pending(&mut uploader).await;
2272
2273 finish_tx1.send(()).unwrap();
2274 let sst = sst_collector.next(&mut uploader).await;
2275 assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids());
2276 assert_eq!(&vec![epoch1], sst.epochs());
2277
2278 let sst = sst_collector.next(&mut uploader).await;
2279 assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids());
2280 assert_eq!(&vec![epoch2], sst.epochs());
2281
2282 let (imm1_3, tracker1_3) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2283 uploader.add_imm(instance_id1, (imm1_3.clone(), tracker1_3));
2284 let epoch1_spill_payload3 = HashMap::from_iter([(instance_id1, vec![imm1_3.clone()])]);
2285 let (await_start1_3, finish_tx1_3) =
2286 new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload3));
2287 uploader.may_flush_for_test();
2288 await_start1_3.await;
2289 let (imm1_4, tracker1_4) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2290 uploader.add_imm(instance_id1, (imm1_4.clone(), tracker1_4));
2291 let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]);
2292 let (await_start1_4, finish_tx1_4) =
2293 new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload));
2294 uploader.local_seal_epoch_for_test(instance_id1, epoch1);
2295 let (sync_tx1, sync_rx1) = oneshot::channel();
2296 uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID]));
2297 await_start1_4.await;
2298
2299 uploader.local_seal_epoch_for_test(instance_id1, epoch2);
2300 uploader.local_seal_epoch_for_test(instance_id2, epoch2);
2301
2302 let (imm3_1, tracker3_1) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2308 let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]);
2309 uploader.add_imm(instance_id1, (imm3_1.clone(), tracker3_1));
2310 let (await_start3_1, finish_tx3_1) =
2311 new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload1));
2312 uploader.may_flush_for_test();
2313 await_start3_1.await;
2314 let (imm3_2, tracker3_2) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2315 let epoch3_spill_payload2 = HashMap::from_iter([(instance_id2, vec![imm3_2.clone()])]);
2316 uploader.add_imm(instance_id2, (imm3_2.clone(), tracker3_2));
2317 let (await_start3_2, finish_tx3_2) =
2318 new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload2));
2319 uploader.may_flush_for_test();
2320 await_start3_2.await;
2321 let (imm3_3, tracker3_3) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2322 uploader.add_imm(instance_id1, (imm3_3.clone(), tracker3_3));
2323
2324 uploader.local_seal_epoch_for_test(instance_id1, epoch3);
2330 let (imm4, tracker4) = gen_imm_with_limiter(epoch4, memory_limiter).await;
2331 uploader.add_imm(instance_id1, (imm4.clone(), tracker4));
2332 assert_uploader_pending(&mut uploader).await;
2333
2334 finish_tx3_1.send(()).unwrap();
2341 assert_uploader_pending(&mut uploader).await;
2342 finish_tx1_4.send(()).unwrap();
2343 assert_uploader_pending(&mut uploader).await;
2344 finish_tx1_3.send(()).unwrap();
2345
2346 let sst = sst_collector.next(&mut uploader).await;
2347 assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids());
2348
2349 let sst = sst_collector.next(&mut uploader).await;
2350 assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids());
2351
2352 match sync_rx1.await {
2353 Ok(Ok(data)) => {
2354 assert_eq!(3, data.uploaded_ssts.len());
2355 assert_eq!(
2356 &get_payload_imm_ids(&epoch1_sync_payload),
2357 data.uploaded_ssts[0].imm_ids()
2358 );
2359 assert_eq!(
2360 &get_payload_imm_ids(&epoch1_spill_payload3),
2361 data.uploaded_ssts[1].imm_ids()
2362 );
2363 assert_eq!(
2364 &get_payload_imm_ids(&epoch1_spill_payload12),
2365 data.uploaded_ssts[2].imm_ids()
2366 );
2367 }
2368 _ => {
2369 unreachable!()
2370 }
2371 }
2372
2373 let (sync_tx2, sync_rx2) = oneshot::channel();
2381 uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID]));
2382 uploader.local_seal_epoch_for_test(instance_id2, epoch3);
2383 let sst = sst_collector.next(&mut uploader).await;
2384 assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids());
2385
2386 match sync_rx2.await {
2387 Ok(Ok(data)) => {
2388 assert_eq!(data.uploaded_ssts.len(), 1);
2389 assert_eq!(
2390 &get_payload_imm_ids(&epoch2_spill_payload),
2391 data.uploaded_ssts[0].imm_ids()
2392 );
2393 }
2394 _ => {
2395 unreachable!("should be sync finish");
2396 }
2397 }
2398 assert_eq!(epoch2, uploader.test_max_synced_epoch());
2399
2400 uploader.local_seal_epoch_for_test(instance_id1, epoch4);
2408 uploader.local_seal_epoch_for_test(instance_id2, epoch4);
2409 let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]);
2410 let (await_start4_with_3_3, finish_tx4_with_3_3) =
2411 new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload));
2412 let (sync_tx4, mut sync_rx4) = oneshot::channel();
2413 uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID]));
2414 await_start4_with_3_3.await;
2415
2416 assert_uploader_pending(&mut uploader).await;
2424
2425 finish_tx3_2.send(()).unwrap();
2426 let sst = sst_collector.next(&mut uploader).await;
2427 assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids());
2428
2429 finish_tx4_with_3_3.send(()).unwrap();
2430 assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await);
2431
2432 let sst = sst_collector.next(&mut uploader).await;
2433 assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids());
2434
2435 match sync_rx4.await {
2436 Ok(Ok(data)) => {
2437 assert_eq!(3, data.uploaded_ssts.len());
2438 assert_eq!(
2439 &get_payload_imm_ids(&epoch4_sync_payload),
2440 data.uploaded_ssts[0].imm_ids()
2441 );
2442 assert_eq!(
2443 &get_payload_imm_ids(&epoch3_spill_payload2),
2444 data.uploaded_ssts[1].imm_ids()
2445 );
2446 assert_eq!(
2447 &get_payload_imm_ids(&epoch3_spill_payload1),
2448 data.uploaded_ssts[2].imm_ids(),
2449 )
2450 }
2451 _ => {
2452 unreachable!("should be sync finish");
2453 }
2454 }
2455 assert_eq!(epoch4, uploader.test_max_synced_epoch());
2456
2457 }
2465
2466 #[tokio::test]
2467 async fn test_uploader_frequently_flush() {
2468 let config = StorageOpts {
2469 shared_buffer_capacity_mb: 10,
2470 shared_buffer_flush_ratio: 0.12,
2471 shared_buffer_min_batch_flush_size_mb: 1,
2473 ..Default::default()
2474 };
2475 let (buffer_tracker, mut uploader, _new_task_notifier) =
2476 prepare_uploader_order_test(&config, true);
2477
2478 let epoch1 = INITIAL_EPOCH.next_epoch();
2479 let epoch2 = epoch1.next_epoch();
2480 uploader.start_epochs_for_test([epoch1, epoch2]);
2481 let instance_id1 = 1;
2482 let instance_id2 = 2;
2483 let flush_threshold = buffer_tracker.flush_threshold();
2484 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2485 let memory_limiter = memory_limiter.as_ref();
2486
2487 uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2488 uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2489
2490 let mut total_memory = 0;
2492 while total_memory < flush_threshold {
2493 let (imm, tracker) = gen_imm_with_limiter(epoch2, memory_limiter).await;
2494 total_memory += imm.size();
2495 if total_memory > flush_threshold {
2496 break;
2497 }
2498 uploader.add_imm(instance_id2, (imm, tracker));
2499 }
2500 let (imm, tracker) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2501 uploader.add_imm(instance_id1, (imm, tracker));
2502 assert!(uploader.may_flush_for_test());
2503
2504 for _ in 0..10 {
2505 let (imm, tracker) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2506 uploader.add_imm(instance_id1, (imm, tracker));
2507 assert!(!uploader.may_flush_for_test());
2508 }
2509 }
2510}