1mod spiller;
16mod task_manager;
17pub(crate) mod test_utils;
18
19use std::cmp::Ordering;
20use std::collections::btree_map::Entry;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque, hash_map};
22use std::fmt::{Debug, Display, Formatter};
23use std::future::{Future, poll_fn};
24use std::mem::{replace, swap, take};
25use std::sync::Arc;
26use std::task::{Context, Poll, ready};
27
28use futures::FutureExt;
29use itertools::Itertools;
30use more_asserts::assert_gt;
31use prometheus::{Histogram, HistogramTimer, IntGauge};
32use risingwave_common::bitmap::BitmapBuilder;
33use risingwave_common::catalog::TableId;
34use risingwave_common::metrics::UintGauge;
35use risingwave_common::must_match;
36use risingwave_hummock_sdk::table_watermark::{
37 TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
40use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId, LocalSstableInfo};
41use task_manager::{TaskManager, UploadingTaskStatus};
42use thiserror_ext::AsReport;
43use tokio::sync::oneshot;
44use tokio::task::JoinHandle;
45use tracing::{debug, error, info, warn};
46
47use crate::hummock::event_handler::LocalInstanceId;
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, send_sync_result};
49use crate::hummock::event_handler::uploader::spiller::Spiller;
50use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm;
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchId;
53use crate::hummock::store::version::StagingSstableInfo;
54use crate::hummock::utils::MemoryTracker;
55use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
56use crate::mem_table::ImmId;
57use crate::monitor::HummockStateStoreMetrics;
58use crate::store::SealCurrentEpochOptions;
59
60fn take_before_epoch<T>(
62 data: &mut BTreeMap<HummockEpoch, T>,
63 epoch: HummockEpoch,
64) -> BTreeMap<HummockEpoch, T> {
65 let mut before_epoch_data = data.split_off(&(epoch + 1));
66 swap(&mut before_epoch_data, data);
67 before_epoch_data
68}
69
70type UploadTaskInput = HashMap<LocalInstanceId, Vec<UploaderImm>>;
71pub type UploadTaskPayload = HashMap<LocalInstanceId, Vec<ImmutableMemtable>>;
72
73#[derive(Debug)]
74pub struct UploadTaskOutput {
75 pub new_value_ssts: Vec<LocalSstableInfo>,
76 pub old_value_ssts: Vec<LocalSstableInfo>,
77 pub wait_poll_timer: Option<HistogramTimer>,
78}
79pub type SpawnUploadTask = Arc<
80 dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle<HummockResult<UploadTaskOutput>>
81 + Send
82 + Sync
83 + 'static,
84>;
85
86#[derive(Clone)]
87pub struct UploadTaskInfo {
88 pub task_size: usize,
89 pub epochs: Vec<HummockEpoch>,
90 pub table_ids: HashSet<TableId>,
91 pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
92}
93
94impl Display for UploadTaskInfo {
95 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct("UploadTaskInfo")
97 .field("task_size", &self.task_size)
98 .field("epochs", &self.epochs)
99 .field("len(imm_ids)", &self.imm_ids.len())
100 .finish()
101 }
102}
103
104impl Debug for UploadTaskInfo {
105 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("UploadTaskInfo")
107 .field("task_size", &self.task_size)
108 .field("epochs", &self.epochs)
109 .field("imm_ids", &self.imm_ids)
110 .finish()
111 }
112}
113
114mod uploader_imm {
115 use std::fmt::Formatter;
116 use std::ops::Deref;
117
118 use risingwave_common::metrics::{LabelGuardedIntGauge, UintGauge};
119
120 #[cfg(test)]
121 use crate::hummock::MemoryLimiter;
122 use crate::hummock::event_handler::uploader::UploaderContext;
123 use crate::hummock::utils::MemoryTracker;
124 use crate::mem_table::ImmutableMemtable;
125
126 pub(super) struct UploaderImm {
127 inner: ImmutableMemtable,
128 size_guard: UintGauge,
129 per_table_size_guard: LabelGuardedIntGauge,
130 per_table_count_guard: LabelGuardedIntGauge,
131 _tracker: MemoryTracker,
132 }
133
134 impl UploaderImm {
135 pub(super) fn new(
136 imm: ImmutableMemtable,
137 context: &UploaderContext,
138 tracker: MemoryTracker,
139 ) -> Self {
140 let size = imm.size();
141 let table_id_str = imm.table_id().to_string();
142 let size_guard = context.stats.uploader_imm_size.clone();
143 let per_table_size_guard = context
144 .stats
145 .uploader_per_table_imm_size
146 .with_guarded_label_values(&[&table_id_str]);
147 let per_table_count_guard = context
148 .stats
149 .uploader_per_table_imm_count
150 .with_guarded_label_values(&[&table_id_str]);
151 size_guard.add(size as _);
152 per_table_size_guard.add(size as _);
153 per_table_count_guard.add(1 as _);
154 Self {
155 inner: imm,
156 size_guard,
157 per_table_size_guard,
158 per_table_count_guard,
159 _tracker: tracker,
160 }
161 }
162
163 #[cfg(test)]
164 pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
165 Self {
166 inner: imm,
167 size_guard: UintGauge::new("test", "test").unwrap(),
168 per_table_size_guard: LabelGuardedIntGauge::test_int_gauge::<1>(),
169 per_table_count_guard: LabelGuardedIntGauge::test_int_gauge::<1>(),
170 _tracker: MemoryLimiter::unlimit().try_require_memory(1).unwrap(),
171 }
172 }
173 }
174
175 impl std::fmt::Debug for UploaderImm {
176 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
177 self.inner.fmt(f)
178 }
179 }
180
181 impl Deref for UploaderImm {
182 type Target = ImmutableMemtable;
183
184 fn deref(&self) -> &Self::Target {
185 &self.inner
186 }
187 }
188
189 impl Drop for UploaderImm {
190 fn drop(&mut self) {
191 self.size_guard.sub(self.inner.size() as _);
192 self.per_table_size_guard.sub(self.inner.size() as _);
193 self.per_table_count_guard.dec();
194 }
195 }
196}
197
198#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
199struct UploadingTaskId(usize);
200
201struct UploadingTask {
204 task_id: UploadingTaskId,
205 input: UploadTaskInput,
207 join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
208 task_info: UploadTaskInfo,
209 spawn_upload_task: SpawnUploadTask,
210 task_size_guard: UintGauge,
211 task_count_guard: IntGauge,
212}
213
214impl Drop for UploadingTask {
215 fn drop(&mut self) {
216 self.task_size_guard.sub(self.task_info.task_size as u64);
217 self.task_count_guard.dec();
218 }
219}
220
221impl Debug for UploadingTask {
222 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
223 f.debug_struct("UploadingTask")
224 .field("input", &self.input)
225 .field("task_info", &self.task_info)
226 .finish()
227 }
228}
229
230fn get_payload_imm_ids(
231 payload: &UploadTaskPayload,
232) -> HashMap<LocalInstanceId, Vec<SharedBufferBatchId>> {
233 payload
234 .iter()
235 .map(|(instance_id, imms)| {
236 (
237 *instance_id,
238 imms.iter().map(|imm| imm.batch_id()).collect_vec(),
239 )
240 })
241 .collect()
242}
243
244impl UploadingTask {
245 const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);
247
248 fn input_to_payload(input: &UploadTaskInput) -> UploadTaskPayload {
249 input
250 .iter()
251 .map(|(instance_id, imms)| {
252 (
253 *instance_id,
254 imms.iter().map(|imm| (**imm).clone()).collect(),
255 )
256 })
257 .collect()
258 }
259
260 fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
261 assert!(!input.is_empty());
262 let mut epochs = input
263 .iter()
264 .flat_map(|(_, imms)| imms.iter().flat_map(|imm| imm.epochs().iter().cloned()))
265 .sorted()
266 .dedup()
267 .collect_vec();
268
269 epochs.reverse();
271 let payload = Self::input_to_payload(&input);
272 let imm_ids = get_payload_imm_ids(&payload);
273 let task_size = input
274 .values()
275 .map(|imms| imms.iter().map(|imm| imm.size()).sum::<usize>())
276 .sum();
277 let task_info = UploadTaskInfo {
278 task_size,
279 epochs,
280 table_ids: payload
281 .values()
282 .flat_map(|imms| imms.iter().map(|imm| imm.table_id()))
283 .collect(),
284 imm_ids,
285 };
286 context
287 .buffer_tracker
288 .global_upload_task_size()
289 .add(task_size as u64);
290 if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
291 info!("start upload task: {:?}", task_info);
292 } else {
293 debug!("start upload task: {:?}", task_info);
294 }
295 let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
296 context.stats.uploader_uploading_task_count.inc();
297 Self {
298 task_id,
299 input,
300 join_handle,
301 task_info,
302 spawn_upload_task: context.spawn_upload_task.clone(),
303 task_size_guard: context.buffer_tracker.global_upload_task_size().clone(),
304 task_count_guard: context.stats.uploader_uploading_task_count.clone(),
305 }
306 }
307
308 fn poll_result(
310 &mut self,
311 cx: &mut Context<'_>,
312 ) -> Poll<HummockResult<Arc<StagingSstableInfo>>> {
313 Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
314 Ok(task_result) => task_result
315 .inspect(|_| {
316 if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
317 info!(task_info = ?self.task_info, "upload task finish");
318 } else {
319 debug!(task_info = ?self.task_info, "upload task finish");
320 }
321 })
322 .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
323 .map(|output| {
324 Arc::new(StagingSstableInfo::new(
325 output.new_value_ssts,
326 output.old_value_ssts,
327 self.task_info.epochs.clone(),
328 self.task_info.imm_ids.clone(),
329 self.task_info.task_size,
330 ))
331 }),
332
333 Err(err) => Err(HummockError::other(format!(
334 "fail to join upload join handle: {}",
335 err.as_report()
336 ))),
337 })
338 }
339
340 fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll<Arc<StagingSstableInfo>> {
342 loop {
343 let result = ready!(self.poll_result(cx));
344 match result {
345 Ok(sstables) => return Poll::Ready(sstables),
346 Err(e) => {
347 error!(
348 error = %e.as_report(),
349 task_info = ?self.task_info,
350 "a flush task failed, start retry",
351 );
352 self.join_handle = (self.spawn_upload_task)(
353 Self::input_to_payload(&self.input),
354 self.task_info.clone(),
355 );
356 }
360 }
361 }
362 }
363
364 pub fn get_task_info(&self) -> &UploadTaskInfo {
365 &self.task_info
366 }
367}
368
369impl TableUnsyncData {
370 fn add_table_watermarks(
371 &mut self,
372 epoch: HummockEpoch,
373 table_watermarks: Vec<VnodeWatermark>,
374 direction: WatermarkDirection,
375 watermark_type: WatermarkSerdeType,
376 ) {
377 if table_watermarks.is_empty() {
378 return;
379 }
380 let vnode_count = table_watermarks[0].vnode_count();
381 for watermark in &table_watermarks {
382 assert_eq!(vnode_count, watermark.vnode_count());
383 }
384
385 fn apply_new_vnodes(
386 vnode_bitmap: &mut BitmapBuilder,
387 vnode_watermarks: &Vec<VnodeWatermark>,
388 ) {
389 for vnode_watermark in vnode_watermarks {
390 for vnode in vnode_watermark.vnode_bitmap().iter_ones() {
391 assert!(
392 !vnode_bitmap.is_set(vnode),
393 "vnode {} write multiple table watermarks",
394 vnode
395 );
396 vnode_bitmap.set(vnode, true);
397 }
398 }
399 }
400 match &mut self.table_watermarks {
401 Some((prev_direction, prev_watermarks, prev_watermark_type)) => {
402 assert_eq!(
403 *prev_direction, direction,
404 "table id {} new watermark direction not match with previous",
405 self.table_id
406 );
407 assert_eq!(
408 *prev_watermark_type, watermark_type,
409 "table id {} new watermark watermark_type not match with previous",
410 self.table_id
411 );
412 match prev_watermarks.entry(epoch) {
413 Entry::Occupied(mut entry) => {
414 let (prev_watermarks, vnode_bitmap) = entry.get_mut();
415 apply_new_vnodes(vnode_bitmap, &table_watermarks);
416 prev_watermarks.extend(table_watermarks);
417 }
418 Entry::Vacant(entry) => {
419 let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
420 apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
421 entry.insert((table_watermarks, vnode_bitmap));
422 }
423 }
424 }
425 None => {
426 let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
427 apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
428 self.table_watermarks = Some((
429 direction,
430 BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]),
431 watermark_type,
432 ));
433 }
434 }
435 }
436}
437
438impl UploaderData {
439 fn add_table_watermarks(
440 all_table_watermarks: &mut HashMap<TableId, TableWatermarks>,
441 table_id: TableId,
442 direction: WatermarkDirection,
443 watermarks: impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)>,
444 watermark_type: WatermarkSerdeType,
445 ) {
446 let mut table_watermarks: Option<TableWatermarks> = None;
447 for (epoch, watermarks) in watermarks {
448 match &mut table_watermarks {
449 Some(prev_watermarks) => {
450 assert_eq!(prev_watermarks.direction, direction);
451 assert_eq!(prev_watermarks.watermark_type, watermark_type);
452 prev_watermarks.add_new_epoch_watermarks(
453 epoch,
454 Arc::from(watermarks),
455 direction,
456 watermark_type,
457 );
458 }
459 None => {
460 table_watermarks = Some(TableWatermarks::single_epoch(
461 epoch,
462 watermarks,
463 direction,
464 watermark_type,
465 ));
466 }
467 }
468 }
469 if let Some(table_watermarks) = table_watermarks {
470 assert!(
471 all_table_watermarks
472 .insert(table_id, table_watermarks)
473 .is_none()
474 );
475 }
476 }
477}
478
479struct LocalInstanceEpochData {
480 epoch: HummockEpoch,
481 imms: VecDeque<UploaderImm>,
483 has_spilled: bool,
484}
485
486impl LocalInstanceEpochData {
487 fn new(epoch: HummockEpoch) -> Self {
488 Self {
489 epoch,
490 imms: VecDeque::new(),
491 has_spilled: false,
492 }
493 }
494
495 fn epoch(&self) -> HummockEpoch {
496 self.epoch
497 }
498
499 fn add_imm(&mut self, imm: UploaderImm) {
500 assert_eq!(imm.max_epoch(), imm.min_epoch());
501 assert_eq!(self.epoch, imm.min_epoch());
502 if let Some(prev_imm) = self.imms.front() {
503 assert_gt!(imm.batch_id(), prev_imm.batch_id());
504 }
505 self.imms.push_front(imm);
506 }
507
508 fn is_empty(&self) -> bool {
509 self.imms.is_empty()
510 }
511}
512
513struct LocalInstanceUnsyncData {
514 table_id: TableId,
515 instance_id: LocalInstanceId,
516 current_epoch_data: Option<LocalInstanceEpochData>,
518 sealed_data: VecDeque<LocalInstanceEpochData>,
520 flushing_imms: VecDeque<SharedBufferBatchId>,
522 is_destroyed: bool,
523 is_stopped: bool,
524}
525
526impl LocalInstanceUnsyncData {
527 fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self {
528 Self {
529 table_id,
530 instance_id,
531 current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
532 sealed_data: VecDeque::new(),
533 flushing_imms: Default::default(),
534 is_destroyed: false,
535 is_stopped: false,
536 }
537 }
538
539 fn add_imm(&mut self, imm: UploaderImm) {
540 assert!(!self.is_destroyed);
541 assert!(!self.is_stopped);
542 assert_eq!(self.table_id, imm.table_id);
543 self.current_epoch_data
544 .as_mut()
545 .expect("should be Some when adding new imm")
546 .add_imm(imm);
547 }
548
549 fn local_seal_epoch(&mut self, next_epoch: HummockEpoch, stopped: bool) -> HummockEpoch {
550 let data = self
551 .current_epoch_data
552 .as_mut()
553 .expect("should be Some when seal new epoch");
554 let current_epoch = data.epoch;
555 debug!(
556 instance_id = self.instance_id,
557 next_epoch, current_epoch, stopped, "local seal epoch"
558 );
559 assert_gt!(next_epoch, current_epoch);
560 assert!(!self.is_destroyed);
561 assert!(!self.is_stopped);
562 self.is_stopped = stopped;
563 let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch));
564 if !epoch_data.is_empty() {
565 self.sealed_data.push_front(epoch_data);
566 }
567 current_epoch
568 }
569
570 fn ack_flushed(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
572 for imm_id in imm_ids {
573 assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id);
574 }
575 }
576
577 fn spill(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
578 let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() {
579 match oldest_sealed_epoch.epoch.cmp(&epoch) {
580 Ordering::Less => {
581 unreachable!(
582 "should not spill at this epoch because there \
583 is unspilled data in previous epoch: prev epoch {}, spill epoch {}",
584 oldest_sealed_epoch.epoch, epoch
585 );
586 }
587 Ordering::Equal => {
588 let epoch_data = self.sealed_data.pop_back().unwrap();
589 assert_eq!(epoch, epoch_data.epoch);
590 epoch_data.imms
591 }
592 Ordering::Greater => VecDeque::new(),
593 }
594 } else {
595 let Some(current_epoch_data) = &mut self.current_epoch_data else {
596 return Vec::new();
597 };
598 match current_epoch_data.epoch.cmp(&epoch) {
599 Ordering::Less => {
600 assert!(
601 current_epoch_data.imms.is_empty(),
602 "should not spill at this epoch because there \
603 is unspilled data in current epoch epoch {}, spill epoch {}",
604 current_epoch_data.epoch,
605 epoch
606 );
607 VecDeque::new()
608 }
609 Ordering::Equal => {
610 if !current_epoch_data.imms.is_empty() {
611 current_epoch_data.has_spilled = true;
612 take(&mut current_epoch_data.imms)
613 } else {
614 VecDeque::new()
615 }
616 }
617 Ordering::Greater => VecDeque::new(),
618 }
619 };
620 self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
621 imms.into_iter().collect()
622 }
623
624 fn add_flushing_imm(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
625 for imm_id in imm_ids {
626 if let Some(prev_imm_id) = self.flushing_imms.front() {
627 assert_gt!(imm_id, *prev_imm_id);
628 }
629 self.flushing_imms.push_front(imm_id);
630 }
631 }
632
633 fn sync(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
636 let mut ret = Vec::new();
638 while let Some(epoch_data) = self.sealed_data.back()
639 && epoch_data.epoch() <= epoch
640 {
641 let imms = self.sealed_data.pop_back().expect("checked exist").imms;
642 self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
643 ret.extend(imms.into_iter().rev());
644 }
645 ret.reverse();
647 if let Some(latest_epoch_data) = &self.current_epoch_data
648 && latest_epoch_data.epoch <= epoch
649 {
650 assert!(self.sealed_data.is_empty());
651 assert!(latest_epoch_data.is_empty());
652 assert!(!latest_epoch_data.has_spilled);
653 if cfg!(debug_assertions) {
654 panic!(
655 "sync epoch exceeds latest epoch, and the current instance should have been archived, table_id = {}, latest_epoch_data = {}, epoch = {}",
656 self.table_id,
657 latest_epoch_data.epoch(),
658 epoch
659 );
660 }
661 warn!(
662 instance_id = self.instance_id,
663 table_id = %self.table_id,
664 "sync epoch exceeds latest epoch, and the current instance should have be archived"
665 );
666 self.current_epoch_data = None;
667 }
668 ret
669 }
670
671 fn assert_after_epoch(&self, epoch: HummockEpoch) {
672 if let Some(oldest_sealed_data) = self.sealed_data.back() {
673 assert!(!oldest_sealed_data.imms.is_empty());
674 assert_gt!(oldest_sealed_data.epoch, epoch);
675 } else if let Some(current_data) = &self.current_epoch_data
676 && current_data.epoch <= epoch
677 {
678 assert!(current_data.imms.is_empty() && !current_data.has_spilled);
679 }
680 }
681
682 fn is_finished(&self) -> bool {
683 self.is_destroyed && self.sealed_data.is_empty()
684 }
685}
686
687struct TableUnsyncData {
688 table_id: TableId,
689 instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
690 #[expect(clippy::type_complexity)]
691 table_watermarks: Option<(
692 WatermarkDirection,
693 BTreeMap<HummockEpoch, (Vec<VnodeWatermark>, BitmapBuilder)>,
694 WatermarkSerdeType,
695 )>,
696 spill_tasks: BTreeMap<HummockEpoch, VecDeque<UploadingTaskId>>,
697 unsync_epochs: BTreeMap<HummockEpoch, ()>,
698 syncing_epochs: VecDeque<HummockEpoch>,
700 max_synced_epoch: Option<HummockEpoch>,
701}
702
703impl TableUnsyncData {
704 fn new(table_id: TableId, committed_epoch: Option<HummockEpoch>) -> Self {
705 Self {
706 table_id,
707 instance_data: Default::default(),
708 table_watermarks: None,
709 spill_tasks: Default::default(),
710 unsync_epochs: Default::default(),
711 syncing_epochs: Default::default(),
712 max_synced_epoch: committed_epoch,
713 }
714 }
715
716 fn new_epoch(&mut self, epoch: HummockEpoch) {
717 debug!(table_id = ?self.table_id, epoch, "table new epoch");
718 if let Some(latest_epoch) = self.max_epoch() {
719 assert_gt!(epoch, latest_epoch);
720 }
721 self.unsync_epochs.insert(epoch, ());
722 }
723
724 #[expect(clippy::type_complexity)]
725 fn sync(
726 &mut self,
727 epoch: HummockEpoch,
728 ) -> (
729 impl Iterator<Item = (LocalInstanceId, Vec<UploaderImm>)> + '_,
730 Option<(
731 WatermarkDirection,
732 impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)> + use<>,
733 WatermarkSerdeType,
734 )>,
735 impl Iterator<Item = UploadingTaskId> + use<>,
736 BTreeMap<HummockEpoch, ()>,
737 ) {
738 if let Some(prev_epoch) = self.max_sync_epoch() {
739 assert_gt!(epoch, prev_epoch)
740 }
741 let epochs = take_before_epoch(&mut self.unsync_epochs, epoch);
742 assert_eq!(
743 *epochs.last_key_value().expect("non-empty").0,
744 epoch,
745 "{epochs:?} {epoch} {:?}",
746 self.table_id
747 );
748 self.syncing_epochs.push_front(epoch);
749 (
750 self.instance_data
751 .iter_mut()
752 .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))),
753 self.table_watermarks
754 .as_mut()
755 .map(|(direction, watermarks, watermark_type)| {
756 let watermarks = take_before_epoch(watermarks, epoch)
757 .into_iter()
758 .map(|(epoch, (watermarks, _))| (epoch, watermarks));
759 (*direction, watermarks, *watermark_type)
760 }),
761 take_before_epoch(&mut self.spill_tasks, epoch)
762 .into_values()
763 .flat_map(|tasks| tasks.into_iter()),
764 epochs,
765 )
766 }
767
768 fn ack_synced(&mut self, sync_epoch: HummockEpoch) {
769 let min_sync_epoch = self.syncing_epochs.pop_back().expect("should exist");
770 assert_eq!(sync_epoch, min_sync_epoch);
771 self.max_synced_epoch = Some(sync_epoch);
772 }
773
774 fn ack_committed(&mut self, committed_epoch: HummockEpoch) {
775 let synced_epoch_advanced = {
776 if let Some(max_synced_epoch) = self.max_synced_epoch
777 && max_synced_epoch >= committed_epoch
778 {
779 false
780 } else {
781 true
782 }
783 };
784 if synced_epoch_advanced {
785 self.max_synced_epoch = Some(committed_epoch);
786 if let Some(min_syncing_epoch) = self.syncing_epochs.back() {
787 assert_gt!(*min_syncing_epoch, committed_epoch);
788 }
789 self.assert_after_epoch(committed_epoch);
790 }
791 }
792
793 fn assert_after_epoch(&self, epoch: HummockEpoch) {
794 self.instance_data
795 .values()
796 .for_each(|instance_data| instance_data.assert_after_epoch(epoch));
797 if let Some((_, watermarks, _)) = &self.table_watermarks
798 && let Some((oldest_epoch, _)) = watermarks.first_key_value()
799 {
800 assert_gt!(*oldest_epoch, epoch);
801 }
802 }
803
804 fn max_sync_epoch(&self) -> Option<HummockEpoch> {
805 self.syncing_epochs
806 .front()
807 .cloned()
808 .or(self.max_synced_epoch)
809 }
810
811 fn max_epoch(&self) -> Option<HummockEpoch> {
812 self.unsync_epochs
813 .last_key_value()
814 .map(|(epoch, _)| *epoch)
815 .or_else(|| self.max_sync_epoch())
816 }
817
818 fn is_empty(&self) -> bool {
819 self.instance_data.is_empty()
820 && self.syncing_epochs.is_empty()
821 && self.unsync_epochs.is_empty()
822 }
823}
824
825#[derive(Eq, Hash, PartialEq, Copy, Clone)]
826struct UnsyncEpochId(HummockEpoch, TableId);
827
828impl UnsyncEpochId {
829 fn epoch(&self) -> HummockEpoch {
830 self.0
831 }
832}
833
834fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet<TableId>) -> Option<UnsyncEpochId> {
835 table_ids
836 .iter()
837 .min()
838 .map(|table_id| UnsyncEpochId(epoch, *table_id))
839}
840
841#[derive(Default)]
842struct UnsyncData {
847 table_data: HashMap<TableId, TableUnsyncData>,
848 instance_table_id: HashMap<LocalInstanceId, TableId>,
850 unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
851 spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
852}
853
854impl UnsyncData {
855 fn init_instance(
856 &mut self,
857 table_id: TableId,
858 instance_id: LocalInstanceId,
859 init_epoch: HummockEpoch,
860 ) {
861 debug!(
862 table_id = %table_id,
863 instance_id, init_epoch, "init epoch"
864 );
865 let table_data = self
866 .table_data
867 .get_mut(&table_id)
868 .unwrap_or_else(|| panic!("should exist. {table_id:?}"));
869 assert!(
870 table_data
871 .instance_data
872 .insert(
873 instance_id,
874 LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch)
875 )
876 .is_none()
877 );
878 assert!(
879 self.instance_table_id
880 .insert(instance_id, table_id)
881 .is_none()
882 );
883 assert!(table_data.unsync_epochs.contains_key(&init_epoch));
884 }
885
886 fn instance_data(
887 &mut self,
888 instance_id: LocalInstanceId,
889 ) -> Option<&mut LocalInstanceUnsyncData> {
890 self.instance_table_id
891 .get_mut(&instance_id)
892 .cloned()
893 .map(move |table_id| {
894 self.table_data
895 .get_mut(&table_id)
896 .expect("should exist")
897 .instance_data
898 .get_mut(&instance_id)
899 .expect("should exist")
900 })
901 }
902
903 fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) {
904 self.instance_data(instance_id)
905 .expect("should exist")
906 .add_imm(imm);
907 }
908
909 fn local_seal_epoch(
910 &mut self,
911 instance_id: LocalInstanceId,
912 next_epoch: HummockEpoch,
913 opts: SealCurrentEpochOptions,
914 ) {
915 let table_id = self.instance_table_id[&instance_id];
916 let table_data = self.table_data.get_mut(&table_id).expect("should exist");
917 let instance_data = table_data
918 .instance_data
919 .get_mut(&instance_id)
920 .expect("should exist");
921 let stopped = !table_data.unsync_epochs.contains_key(&next_epoch);
925 let epoch = instance_data.local_seal_epoch(next_epoch, stopped);
926 if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks {
927 table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type);
928 }
929 }
930
931 fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
932 if let Some(table_id) = self.instance_table_id.get(&instance_id) {
933 debug!(instance_id, "destroy instance");
934 let table_data = self.table_data.get_mut(table_id).expect("should exist");
935 let instance_data = table_data
936 .instance_data
937 .get_mut(&instance_id)
938 .expect("should exist");
939 assert!(
940 !instance_data.is_destroyed,
941 "cannot destroy an instance for twice"
942 );
943 instance_data.is_destroyed = true;
944 }
945 }
946
947 fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
948 for table_id in table_ids {
949 if let Some(table_unsync_data) = self.table_data.remove(table_id) {
950 for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
951 if let Some(task_status) = task_manager.abort_task(task_id) {
952 must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
953 assert!(spill_table_ids.is_subset(table_ids));
954 });
955 }
956 if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
957 assert!(spill_table_ids.is_subset(table_ids));
958 }
959 }
960 assert!(
961 table_unsync_data
962 .instance_data
963 .values()
964 .all(|instance| instance.is_destroyed),
965 "should be clear when dropping the read version instance"
966 );
967 for instance_id in table_unsync_data.instance_data.keys() {
968 assert_eq!(
969 *table_id,
970 self.instance_table_id
971 .remove(instance_id)
972 .expect("should exist")
973 );
974 }
975 }
976 }
977 debug_assert!(
978 self.spilled_data
979 .values()
980 .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))
981 );
982 self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
983 if !unsync_epoch_table_ids.is_disjoint(table_ids) {
984 assert!(unsync_epoch_table_ids.is_subset(table_ids));
985 false
986 } else {
987 true
988 }
989 });
990 assert!(
991 self.instance_table_id
992 .values()
993 .all(|table_id| !table_ids.contains(table_id))
994 );
995 }
996}
997
998impl UploaderData {
999 fn sync(
1000 &mut self,
1001 context: &UploaderContext,
1002 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1003 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1004 ) {
1005 let mut all_table_watermarks = HashMap::new();
1006 let mut uploading_tasks = HashSet::new();
1007 let mut spilled_tasks = BTreeSet::new();
1008 let mut all_table_ids = HashSet::new();
1009 let mut vector_index_adds = HashMap::new();
1010
1011 let mut flush_payload = HashMap::new();
1012
1013 for (epoch, table_ids) in &sync_table_epochs {
1014 let epoch = *epoch;
1015 for table_id in table_ids {
1016 assert!(
1017 all_table_ids.insert(*table_id),
1018 "duplicate sync table epoch: {:?} {:?}",
1019 all_table_ids,
1020 sync_table_epochs
1021 );
1022 }
1023 if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) {
1024 let min_table_id_data = self
1025 .unsync_data
1026 .table_data
1027 .get_mut(&min_table_id)
1028 .expect("should exist");
1029 let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch);
1030 for epoch in epochs.keys() {
1031 assert_eq!(
1032 &self
1033 .unsync_data
1034 .unsync_epochs
1035 .remove(&UnsyncEpochId(*epoch, min_table_id))
1036 .expect("should exist"),
1037 table_ids
1038 );
1039 }
1040 for table_id in table_ids {
1041 let table_data = self
1042 .unsync_data
1043 .table_data
1044 .get_mut(table_id)
1045 .expect("should exist");
1046 let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) =
1047 table_data.sync(epoch);
1048 assert_eq!(table_unsync_epochs, epochs, "invalid table_id {table_id}");
1049 for (instance_id, payload) in unflushed_payload {
1050 if !payload.is_empty() {
1051 flush_payload.insert(instance_id, payload);
1052 }
1053 }
1054 table_data.instance_data.retain(|instance_id, data| {
1055 if data.is_finished() {
1057 assert_eq!(
1058 self.unsync_data.instance_table_id.remove(instance_id),
1059 Some(*table_id)
1060 );
1061 false
1062 } else {
1063 true
1064 }
1065 });
1066 if let Some((direction, watermarks, watermark_type)) = table_watermarks {
1067 Self::add_table_watermarks(
1068 &mut all_table_watermarks,
1069 *table_id,
1070 direction,
1071 watermarks,
1072 watermark_type,
1073 );
1074 }
1075 for task_id in task_ids {
1076 if self.unsync_data.spilled_data.contains_key(&task_id) {
1077 spilled_tasks.insert(task_id);
1078 } else {
1079 uploading_tasks.insert(task_id);
1080 }
1081 }
1082
1083 if let hash_map::Entry::Occupied(mut entry) =
1084 self.unsync_vector_index_data.entry(*table_id)
1085 {
1086 let data = entry.get_mut();
1087 let adds = take_before_epoch(&mut data.sealed_epoch_data, epoch)
1088 .into_values()
1089 .flatten()
1090 .collect_vec();
1091 if data.is_dropped && data.sealed_epoch_data.is_empty() {
1092 entry.remove();
1093 }
1094 if !adds.is_empty() {
1095 vector_index_adds
1096 .try_insert(*table_id, adds)
1097 .expect("non-duplicate");
1098 }
1099 }
1100 }
1101 }
1102 }
1103
1104 let sync_id = {
1105 let sync_id = self.next_sync_id;
1106 self.next_sync_id += 1;
1107 SyncId(sync_id)
1108 };
1109
1110 if let Some(extra_flush_task_id) = self.task_manager.sync(
1111 context,
1112 sync_id,
1113 flush_payload,
1114 uploading_tasks.iter().cloned(),
1115 &all_table_ids,
1116 ) {
1117 uploading_tasks.insert(extra_flush_task_id);
1118 }
1119
1120 let uploaded = spilled_tasks
1122 .iter()
1123 .rev()
1124 .map(|task_id| {
1125 let (sst, spill_table_ids) = self
1126 .unsync_data
1127 .spilled_data
1128 .remove(task_id)
1129 .expect("should exist");
1130 assert!(
1131 spill_table_ids.is_subset(&all_table_ids),
1132 "spilled tabled ids {:?} not a subset of sync table id {:?}",
1133 spill_table_ids,
1134 all_table_ids
1135 );
1136 sst
1137 })
1138 .collect();
1139
1140 self.syncing_data.insert(
1141 sync_id,
1142 SyncingData {
1143 sync_table_epochs,
1144 remaining_uploading_tasks: uploading_tasks,
1145 uploaded,
1146 table_watermarks: all_table_watermarks,
1147 vector_index_adds,
1148 sync_result_sender,
1149 },
1150 );
1151
1152 self.check_upload_task_consistency();
1153 }
1154}
1155
1156impl UnsyncData {
1157 fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
1158 for (instance_id, imm_ids) in sstable_info.imm_ids() {
1159 if let Some(instance_data) = self.instance_data(*instance_id) {
1160 instance_data.ack_flushed(imm_ids.iter().rev().cloned());
1162 }
1163 }
1164 }
1165}
1166
1167struct SyncingData {
1168 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1169 remaining_uploading_tasks: HashSet<UploadingTaskId>,
1170 uploaded: VecDeque<Arc<StagingSstableInfo>>,
1172 table_watermarks: HashMap<TableId, TableWatermarks>,
1173 vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1174 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1175}
1176
1177#[derive(Debug)]
1178pub struct SyncedData {
1179 pub uploaded_ssts: VecDeque<Arc<StagingSstableInfo>>,
1180 pub table_watermarks: HashMap<TableId, TableWatermarks>,
1181 pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1182}
1183
1184struct UploaderContext {
1185 pinned_version: PinnedVersion,
1186 spawn_upload_task: SpawnUploadTask,
1188 buffer_tracker: BufferTracker,
1189
1190 stats: Arc<HummockStateStoreMetrics>,
1191}
1192
1193impl UploaderContext {
1194 fn new(
1195 pinned_version: PinnedVersion,
1196 spawn_upload_task: SpawnUploadTask,
1197 buffer_tracker: BufferTracker,
1198 stats: Arc<HummockStateStoreMetrics>,
1199 ) -> Self {
1200 UploaderContext {
1201 pinned_version,
1202 spawn_upload_task,
1203 buffer_tracker,
1204 stats,
1205 }
1206 }
1207}
1208
1209#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
1210struct SyncId(usize);
1211
1212struct UnsyncVectorIndexData {
1213 sealed_epoch_data: BTreeMap<HummockEpoch, Option<VectorIndexAdd>>,
1214 curr_epoch: u64,
1215 is_dropped: bool,
1216}
1217
1218#[derive(Default)]
1219struct UploaderData {
1220 unsync_data: UnsyncData,
1221 unsync_vector_index_data: HashMap<TableId, UnsyncVectorIndexData>,
1222
1223 syncing_data: BTreeMap<SyncId, SyncingData>,
1224
1225 task_manager: TaskManager,
1226 next_sync_id: usize,
1227}
1228
1229impl UploaderData {
1230 fn abort(self, err: impl Fn() -> HummockError) {
1231 self.task_manager.abort_all_tasks();
1232 for syncing_data in self.syncing_data.into_values() {
1233 send_sync_result(syncing_data.sync_result_sender, Err(err()));
1234 }
1235 }
1236
1237 fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
1238 if table_ids.is_empty() {
1239 return;
1240 }
1241 self.unsync_data
1242 .clear_tables(&table_ids, &mut self.task_manager);
1243 self.syncing_data.retain(|sync_id, syncing_data| {
1244 if syncing_data
1245 .sync_table_epochs
1246 .iter()
1247 .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids))
1248 {
1249 assert!(
1250 syncing_data
1251 .sync_table_epochs
1252 .iter()
1253 .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))
1254 );
1255 for task_id in &syncing_data.remaining_uploading_tasks {
1256 match self
1257 .task_manager
1258 .abort_task(*task_id)
1259 .expect("should exist")
1260 {
1261 UploadingTaskStatus::Spilling(spill_table_ids) => {
1262 assert!(spill_table_ids.is_subset(&table_ids));
1263 }
1264 UploadingTaskStatus::Sync(task_sync_id) => {
1265 assert_eq!(sync_id, &task_sync_id);
1266 }
1267 }
1268 }
1269 false
1270 } else {
1271 true
1272 }
1273 });
1274
1275 self.check_upload_task_consistency();
1276 }
1277
1278 fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1279 self.unsync_data
1280 .spilled_data
1281 .values()
1282 .map(|(s, _)| s)
1283 .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
1284 .filter_map(|s| {
1285 s.sstable_infos()
1286 .iter()
1287 .chain(s.old_value_sstable_infos())
1288 .map(|s| s.sst_info.object_id)
1289 .min()
1290 })
1291 .min()
1292 .map(|object_id| object_id.as_raw())
1293 }
1294}
1295
1296struct ErrState {
1297 failed_sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1298 reason: String,
1299}
1300
1301enum UploaderState {
1302 Working(UploaderData),
1303 Err(ErrState),
1304}
1305
1306pub struct HummockUploader {
1319 state: UploaderState,
1320
1321 context: UploaderContext,
1322}
1323
1324impl HummockUploader {
1325 pub(super) fn new(
1326 state_store_metrics: Arc<HummockStateStoreMetrics>,
1327 pinned_version: PinnedVersion,
1328 spawn_upload_task: SpawnUploadTask,
1329 buffer_tracker: BufferTracker,
1330 ) -> Self {
1331 Self {
1332 state: UploaderState::Working(UploaderData::default()),
1333 context: UploaderContext::new(
1334 pinned_version,
1335 spawn_upload_task,
1336 buffer_tracker,
1337 state_store_metrics,
1338 ),
1339 }
1340 }
1341
1342 pub(super) fn buffer_tracker(&self) -> &BufferTracker {
1343 &self.context.buffer_tracker
1344 }
1345
1346 pub(super) fn hummock_version(&self) -> &PinnedVersion {
1347 &self.context.pinned_version
1348 }
1349
1350 pub(super) fn add_imms(
1351 &mut self,
1352 instance_id: LocalInstanceId,
1353 imms: Vec<(ImmutableMemtable, MemoryTracker)>,
1354 ) {
1355 let UploaderState::Working(data) = &mut self.state else {
1356 return;
1357 };
1358 for (imm, tracker) in imms {
1359 let imm = UploaderImm::new(imm, &self.context, tracker);
1360 data.unsync_data.add_imm(instance_id, imm);
1361 }
1362 }
1363
1364 pub(super) fn init_instance(
1365 &mut self,
1366 instance_id: LocalInstanceId,
1367 table_id: TableId,
1368 init_epoch: HummockEpoch,
1369 ) {
1370 let UploaderState::Working(data) = &mut self.state else {
1371 return;
1372 };
1373 data.unsync_data
1374 .init_instance(table_id, instance_id, init_epoch);
1375 }
1376
1377 pub(super) fn local_seal_epoch(
1378 &mut self,
1379 instance_id: LocalInstanceId,
1380 next_epoch: HummockEpoch,
1381 opts: SealCurrentEpochOptions,
1382 ) {
1383 let UploaderState::Working(data) = &mut self.state else {
1384 return;
1385 };
1386 data.unsync_data
1387 .local_seal_epoch(instance_id, next_epoch, opts);
1388 }
1389
1390 pub(super) fn register_vector_writer(&mut self, table_id: TableId, init_epoch: HummockEpoch) {
1391 let UploaderState::Working(data) = &mut self.state else {
1392 return;
1393 };
1394 assert!(
1395 data.unsync_vector_index_data
1396 .try_insert(
1397 table_id,
1398 UnsyncVectorIndexData {
1399 sealed_epoch_data: Default::default(),
1400 curr_epoch: init_epoch,
1401 is_dropped: false,
1402 }
1403 )
1404 .is_ok(),
1405 "duplicate vector writer on {}",
1406 table_id
1407 )
1408 }
1409
1410 pub(super) fn vector_writer_seal_epoch(
1411 &mut self,
1412 table_id: TableId,
1413 next_epoch: HummockEpoch,
1414 add: Option<VectorIndexAdd>,
1415 ) {
1416 let UploaderState::Working(data) = &mut self.state else {
1417 return;
1418 };
1419 let data = data
1420 .unsync_vector_index_data
1421 .get_mut(&table_id)
1422 .expect("should exist");
1423 assert!(!data.is_dropped);
1424 assert!(
1425 data.curr_epoch < next_epoch,
1426 "next epoch {} should be greater than current epoch {}",
1427 next_epoch,
1428 data.curr_epoch
1429 );
1430 data.sealed_epoch_data
1431 .try_insert(data.curr_epoch, add)
1432 .expect("non-duplicate");
1433 data.curr_epoch = next_epoch;
1434 }
1435
1436 pub(super) fn drop_vector_writer(&mut self, table_id: TableId) {
1437 let UploaderState::Working(data) = &mut self.state else {
1438 return;
1439 };
1440 let hash_map::Entry::Occupied(mut entry) = data.unsync_vector_index_data.entry(table_id)
1441 else {
1442 panic!("vector writer {} should exist", table_id);
1443 };
1444 let data = entry.get_mut();
1445 data.is_dropped = true;
1446 if data.sealed_epoch_data.is_empty() {
1447 entry.remove();
1448 }
1449 }
1450
1451 pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
1452 let UploaderState::Working(data) = &mut self.state else {
1453 return;
1454 };
1455 debug!(epoch, ?table_ids, "start epoch");
1456 for table_id in &table_ids {
1457 let table_data = data
1458 .unsync_data
1459 .table_data
1460 .entry(*table_id)
1461 .or_insert_with(|| {
1462 TableUnsyncData::new(
1463 *table_id,
1464 self.context.pinned_version.table_committed_epoch(*table_id),
1465 )
1466 });
1467 table_data.new_epoch(epoch);
1468 }
1469 if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) {
1470 assert!(
1471 data.unsync_data
1472 .unsync_epochs
1473 .insert(unsync_epoch_id, table_ids)
1474 .is_none()
1475 );
1476 }
1477 }
1478
1479 pub(super) fn start_sync_epoch(
1480 &mut self,
1481 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1482 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1483 ) {
1484 let data = match &mut self.state {
1485 UploaderState::Working(data) => data,
1486 UploaderState::Err(ErrState {
1487 failed_sync_table_epochs,
1488 reason,
1489 }) => {
1490 let result = Err(HummockError::other(format!(
1491 "previous sync epoch {:?} failed due to [{}]",
1492 failed_sync_table_epochs, reason
1493 )));
1494 send_sync_result(sync_result_sender, result);
1495 return;
1496 }
1497 };
1498 debug!(?sync_table_epochs, "start sync epoch");
1499
1500 data.sync(&self.context, sync_result_sender, sync_table_epochs);
1501
1502 data.may_notify_sync_task(&self.context);
1503
1504 self.context
1505 .stats
1506 .uploader_syncing_epoch_count
1507 .set(data.syncing_data.len() as _);
1508 }
1509
1510 pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) {
1511 if let UploaderState::Working(data) = &mut self.state {
1512 for (table_id, info) in pinned_version.state_table_info.info() {
1514 if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) {
1515 table_data.ack_committed(info.committed_epoch);
1516 }
1517 }
1518 }
1519 self.context.pinned_version = pinned_version;
1520 }
1521
1522 pub(crate) fn may_flush(&mut self, spiller_latency: &Histogram) -> bool {
1523 let UploaderState::Working(data) = &mut self.state else {
1524 return false;
1525 };
1526 if self.context.buffer_tracker.need_flush() {
1527 let timer = spiller_latency.start_timer();
1528 let mut spiller = Spiller::new(&mut data.unsync_data);
1529 let mut curr_batch_flush_size = 0;
1530 while self
1532 .context
1533 .buffer_tracker
1534 .need_more_flush(curr_batch_flush_size)
1535 && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload()
1536 {
1537 assert!(!payload.is_empty());
1538 {
1539 let (task_id, task_size, spilled_table_ids) =
1540 data.task_manager
1541 .spill(&self.context, spilled_table_ids, payload);
1542 for table_id in spilled_table_ids {
1543 spiller
1544 .unsync_data()
1545 .table_data
1546 .get_mut(table_id)
1547 .expect("should exist")
1548 .spill_tasks
1549 .entry(epoch)
1550 .or_default()
1551 .push_front(task_id);
1552 }
1553 curr_batch_flush_size += task_size;
1554 }
1555 }
1556 timer.observe_duration();
1557 data.check_upload_task_consistency();
1558 curr_batch_flush_size > 0
1559 } else {
1560 false
1561 }
1562 }
1563
1564 pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
1565 if let Some(table_ids) = table_ids {
1566 if let UploaderState::Working(data) = &mut self.state {
1567 data.clear_tables(table_ids);
1568 }
1569 } else {
1570 if let UploaderState::Working(data) = replace(
1571 &mut self.state,
1572 UploaderState::Working(UploaderData::default()),
1573 ) {
1574 data.abort(|| HummockError::other("uploader is reset"));
1575 }
1576
1577 self.context.stats.uploader_syncing_epoch_count.set(0);
1578 }
1579 }
1580
1581 pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
1582 let UploaderState::Working(data) = &mut self.state else {
1583 return;
1584 };
1585 data.unsync_data.may_destroy_instance(instance_id);
1586 }
1587
1588 pub(crate) fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1589 if let UploaderState::Working(ref u) = self.state {
1590 u.min_uncommitted_object_id()
1591 } else {
1592 None
1593 }
1594 }
1595}
1596
1597impl UploaderData {
1598 fn may_notify_sync_task(&mut self, context: &UploaderContext) {
1599 while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
1600 && syncing_data.remaining_uploading_tasks.is_empty()
1601 {
1602 let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty");
1603 let SyncingData {
1604 sync_table_epochs,
1605 remaining_uploading_tasks: _,
1606 uploaded,
1607 table_watermarks,
1608 vector_index_adds,
1609 sync_result_sender,
1610 } = syncing_data;
1611 context
1612 .stats
1613 .uploader_syncing_epoch_count
1614 .set(self.syncing_data.len() as _);
1615
1616 for (sync_epoch, table_ids) in sync_table_epochs {
1617 for table_id in table_ids {
1618 if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) {
1619 table_data.ack_synced(sync_epoch);
1620 if table_data.is_empty() {
1621 self.unsync_data.table_data.remove(&table_id);
1622 }
1623 }
1624 }
1625 }
1626
1627 send_sync_result(
1628 sync_result_sender,
1629 Ok(SyncedData {
1630 uploaded_ssts: uploaded,
1631 table_watermarks,
1632 vector_index_adds,
1633 }),
1634 )
1635 }
1636 }
1637
1638 fn check_upload_task_consistency(&self) {
1639 #[cfg(debug_assertions)]
1640 {
1641 let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new();
1642 for table_data in self.unsync_data.table_data.values() {
1643 for task_id in table_data
1644 .spill_tasks
1645 .iter()
1646 .flat_map(|(_, tasks)| tasks.iter())
1647 {
1648 assert!(
1649 spill_task_table_id_from_data
1650 .entry(*task_id)
1651 .or_default()
1652 .insert(table_data.table_id)
1653 );
1654 }
1655 }
1656 let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self
1657 .syncing_data
1658 .iter()
1659 .filter_map(|(sync_id, data)| {
1660 if data.remaining_uploading_tasks.is_empty() {
1661 None
1662 } else {
1663 Some((*sync_id, data.remaining_uploading_tasks.clone()))
1664 }
1665 })
1666 .collect();
1667
1668 let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1669 for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
1670 spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
1671 }
1672 let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1673 for (task_id, status) in self.task_manager.tasks() {
1674 match status {
1675 UploadingTaskStatus::Spilling(table_ids) => {
1676 assert!(
1677 spill_task_table_id_from_manager
1678 .insert(task_id, table_ids.clone())
1679 .is_none()
1680 );
1681 }
1682 UploadingTaskStatus::Sync(sync_id) => {
1683 assert!(
1684 syncing_task_from_manager
1685 .entry(*sync_id)
1686 .or_default()
1687 .insert(task_id)
1688 );
1689 }
1690 }
1691 }
1692 assert_eq!(
1693 spill_task_table_id_from_data,
1694 spill_task_table_id_from_manager
1695 );
1696 assert_eq!(syncing_task_id_from_data, syncing_task_from_manager);
1697 }
1698 }
1699}
1700
1701impl HummockUploader {
1702 pub(super) fn next_uploaded_ssts(
1703 &mut self,
1704 ) -> impl Future<Output = Vec<Arc<StagingSstableInfo>>> + '_ {
1705 poll_fn(|cx| {
1706 let UploaderState::Working(data) = &mut self.state else {
1707 return Poll::Pending;
1708 };
1709
1710 const MAX_BATCH_SIZE: usize = 16;
1711
1712 let mut ssts = None;
1713 while let Poll::Ready(Some((task_id, status, result))) =
1714 data.task_manager.poll_task_result(cx)
1715 {
1716 match result {
1717 Ok(sst) => {
1718 data.unsync_data.ack_flushed(&sst);
1719 match status {
1720 UploadingTaskStatus::Sync(sync_id) => {
1721 let syncing_data =
1722 data.syncing_data.get_mut(&sync_id).expect("should exist");
1723 syncing_data.uploaded.push_front(sst.clone());
1724 assert!(syncing_data.remaining_uploading_tasks.remove(&task_id));
1725 data.may_notify_sync_task(&self.context);
1726 }
1727 UploadingTaskStatus::Spilling(table_ids) => {
1728 data.unsync_data
1729 .spilled_data
1730 .insert(task_id, (sst.clone(), table_ids));
1731 }
1732 }
1733 data.check_upload_task_consistency();
1734 let ssts = ssts.get_or_insert_with(|| Vec::with_capacity(MAX_BATCH_SIZE));
1735 ssts.push(sst);
1736 if ssts.len() >= MAX_BATCH_SIZE {
1737 break;
1738 }
1739 }
1740 Err((sync_id, e)) => {
1741 let syncing_data =
1742 data.syncing_data.remove(&sync_id).expect("should exist");
1743 let failed_epochs = syncing_data.sync_table_epochs.clone();
1744 let data = must_match!(replace(
1745 &mut self.state,
1746 UploaderState::Err(ErrState {
1747 failed_sync_table_epochs: syncing_data.sync_table_epochs,
1748 reason: e.as_report().to_string(),
1749 }),
1750 ), UploaderState::Working(data) => data);
1751
1752 let _ = syncing_data
1753 .sync_result_sender
1754 .send(Err(HummockError::other(format!(
1755 "failed to sync: {:?}",
1756 e.as_report()
1757 ))));
1758
1759 data.abort(|| {
1760 HummockError::other(format!(
1761 "previous epoch {:?} failed to sync",
1762 failed_epochs
1763 ))
1764 });
1765 return Poll::Pending;
1766 }
1767 }
1768 }
1769 if let Some(ssts) = ssts {
1770 Poll::Ready(ssts)
1771 } else {
1772 Poll::Pending
1773 }
1774 })
1775 }
1776}
1777
1778#[cfg(test)]
1779pub(crate) mod tests {
1780 use std::collections::{HashMap, HashSet};
1781 use std::future::{Future, poll_fn};
1782 use std::pin::pin;
1783 use std::sync::Arc;
1784 use std::sync::atomic::AtomicUsize;
1785 use std::sync::atomic::Ordering::SeqCst;
1786 use std::task::Poll;
1787
1788 use futures::FutureExt;
1789 use risingwave_common::catalog::TableId;
1790 use risingwave_common::metrics::LabelGuardedHistogram;
1791 use risingwave_common::util::epoch::EpochExt;
1792 use risingwave_hummock_sdk::HummockEpoch;
1793 use risingwave_hummock_sdk::vector_index::{
1794 FlatIndexAdd, VectorFileInfo, VectorIndexAdd, VectorStoreInfoDelta,
1795 };
1796 use tokio::sync::oneshot;
1797
1798 use super::test_utils::*;
1799 use crate::hummock::event_handler::uploader::{
1800 HummockUploader, SyncedData, UploadingTask, get_payload_imm_ids,
1801 };
1802 use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID};
1803 use crate::hummock::utils::MemoryTracker;
1804 use crate::hummock::{HummockError, HummockResult};
1805 use crate::mem_table::ImmutableMemtable;
1806 use crate::opts::StorageOpts;
1807
1808 impl HummockUploader {
1809 pub(super) fn add_imm(
1810 &mut self,
1811 instance_id: LocalInstanceId,
1812 imm: (ImmutableMemtable, MemoryTracker),
1813 ) {
1814 self.add_imms(instance_id, vec![imm]);
1815 }
1816
1817 pub(super) fn start_single_epoch_sync(
1818 &mut self,
1819 epoch: HummockEpoch,
1820 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1821 table_ids: HashSet<TableId>,
1822 ) {
1823 self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]);
1824 }
1825
1826 pub(super) fn may_flush_for_test(&mut self) -> bool {
1827 self.may_flush(&LabelGuardedHistogram::test_histogram::<1>())
1828 }
1829 }
1830
1831 #[tokio::test]
1832 pub async fn test_uploading_task_future() {
1833 let uploader_context = test_uploader_context(dummy_success_upload_future);
1834
1835 let imm = gen_imm(INITIAL_EPOCH);
1836 let imm_size = imm.size();
1837 let imm_ids = get_imm_ids(vec![&imm]);
1838 let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1839 assert_eq!(imm_size, task.task_info.task_size);
1840 assert_eq!(imm_ids, task.task_info.imm_ids);
1841 assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs);
1842 let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1843 assert_eq!(
1844 output.sstable_infos(),
1845 &dummy_success_upload_output().new_value_ssts
1846 );
1847 assert_eq!(imm_size, output.imm_size());
1848 assert_eq!(&imm_ids, output.imm_ids());
1849 assert_eq!(&vec![INITIAL_EPOCH], output.epochs());
1850
1851 let uploader_context = test_uploader_context(dummy_fail_upload_future);
1852 let imm = gen_imm(INITIAL_EPOCH);
1853 let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1854 let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1855 }
1856
1857 #[tokio::test]
1858 pub async fn test_uploading_task_poll_result() {
1859 let uploader_context = test_uploader_context(dummy_success_upload_future);
1860 let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1861 let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1862 assert_eq!(
1863 output.sstable_infos(),
1864 &dummy_success_upload_output().new_value_ssts
1865 );
1866
1867 let uploader_context = test_uploader_context(dummy_fail_upload_future);
1868 let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1869 let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1870 }
1871
1872 #[tokio::test]
1873 async fn test_uploading_task_poll_ok_with_retry() {
1874 let run_count = Arc::new(AtomicUsize::new(0));
1875 let fail_num = 10;
1876 let run_count_clone = run_count.clone();
1877 let uploader_context = test_uploader_context(move |payload, info| {
1878 let run_count = run_count.clone();
1879 async move {
1880 let ret = if run_count.load(SeqCst) < fail_num {
1882 Err(HummockError::other("fail"))
1883 } else {
1884 dummy_success_upload_future(payload, info).await
1885 };
1886 run_count.fetch_add(1, SeqCst);
1887 ret
1888 }
1889 });
1890 let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1891 let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await;
1892 assert_eq!(fail_num + 1, run_count_clone.load(SeqCst));
1893 assert_eq!(
1894 output.sstable_infos(),
1895 &dummy_success_upload_output().new_value_ssts
1896 );
1897 }
1898
1899 #[tokio::test]
1900 async fn test_uploader_basic() {
1901 let mut uploader = test_uploader(dummy_success_upload_future);
1902 let mut sst_collector = UploadedSstCollector::default();
1903 let epoch1 = INITIAL_EPOCH.next_epoch();
1904 const VECTOR_INDEX_TABLE_ID: TableId = TableId::new(234);
1905 uploader.start_epoch(
1906 epoch1,
1907 HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1908 );
1909 let (imm, tracker) = gen_imm_with_unlimit(epoch1);
1910 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1911 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
1912 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1913
1914 uploader.register_vector_writer(VECTOR_INDEX_TABLE_ID, epoch1);
1915 let vector_info_file = VectorFileInfo {
1916 object_id: 1.into(),
1917 vector_count: 1,
1918 file_size: 0,
1919 start_vector_id: 0,
1920 meta_offset: 20,
1921 };
1922 let vector_index_add = VectorIndexAdd::Flat(FlatIndexAdd {
1923 vector_store_info_delta: VectorStoreInfoDelta {
1924 next_vector_id: 1,
1925 added_vector_files: vec![vector_info_file.clone()],
1926 },
1927 });
1928 uploader.vector_writer_seal_epoch(
1929 VECTOR_INDEX_TABLE_ID,
1930 epoch1.next_epoch(),
1931 Some(vector_index_add.clone()),
1932 );
1933
1934 let (sync_tx, sync_rx) = oneshot::channel();
1935 uploader.start_single_epoch_sync(
1936 epoch1,
1937 sync_tx,
1938 HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1939 );
1940 assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1941 assert_eq!(1, uploader.data().syncing_data.len());
1942 let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1943 assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1944 assert!(syncing_data.uploaded.is_empty());
1945 assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1946
1947 let staging_sst = sst_collector.next(&mut uploader).await;
1948 assert_eq!(&vec![epoch1], staging_sst.epochs());
1949 assert_eq!(
1950 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1951 staging_sst.imm_ids()
1952 );
1953 assert_eq!(
1954 &dummy_success_upload_output().new_value_ssts,
1955 staging_sst.sstable_infos()
1956 );
1957
1958 match sync_rx.await {
1959 Ok(Ok(data)) => {
1960 let SyncedData {
1961 uploaded_ssts,
1962 table_watermarks,
1963 vector_index_adds,
1964 } = data;
1965 assert_eq!(1, uploaded_ssts.len());
1966 let staging_sst = &uploaded_ssts[0];
1967 assert_eq!(&vec![epoch1], staging_sst.epochs());
1968 assert_eq!(
1969 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1970 staging_sst.imm_ids()
1971 );
1972 assert_eq!(
1973 &dummy_success_upload_output().new_value_ssts,
1974 staging_sst.sstable_infos()
1975 );
1976 assert!(table_watermarks.is_empty());
1977 assert_eq!(vector_index_adds.len(), 1);
1978 let (table_id, vector_index_adds) = vector_index_adds.into_iter().next().unwrap();
1979 assert_eq!(table_id, VECTOR_INDEX_TABLE_ID);
1980 assert_eq!(vector_index_adds.len(), 1);
1981 let synced_vector_index_add = vector_index_adds[0].clone();
1982 assert_eq!(vector_index_add, synced_vector_index_add);
1983 }
1984 _ => unreachable!(),
1985 };
1986 assert_eq!(epoch1, uploader.test_max_synced_epoch());
1987
1988 let new_pinned_version = uploader
1989 .context
1990 .pinned_version
1991 .new_pin_version(test_hummock_version(epoch1))
1992 .unwrap();
1993 uploader.update_pinned_version(new_pinned_version);
1994 assert_eq!(
1995 epoch1,
1996 uploader
1997 .context
1998 .pinned_version
1999 .table_committed_epoch(TEST_TABLE_ID)
2000 .unwrap()
2001 );
2002 }
2003
2004 #[tokio::test]
2005 async fn test_uploader_destroy_instance_before_sync() {
2006 let mut uploader = test_uploader(dummy_success_upload_future);
2007 let mut sst_collector = UploadedSstCollector::default();
2008 let epoch1 = INITIAL_EPOCH.next_epoch();
2009 uploader.start_epochs_for_test([epoch1]);
2010 let (imm, tracker) = gen_imm_with_unlimit(epoch1);
2011 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2012 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
2013 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2014 uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);
2015
2016 let (sync_tx, sync_rx) = oneshot::channel();
2017 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2018 assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
2019 assert_eq!(1, uploader.data().syncing_data.len());
2020 let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
2021 assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
2022 assert!(syncing_data.uploaded.is_empty());
2023 assert!(!syncing_data.remaining_uploading_tasks.is_empty());
2024
2025 let staging_sst = sst_collector.next(&mut uploader).await;
2026 assert_eq!(&vec![epoch1], staging_sst.epochs());
2027 assert_eq!(
2028 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2029 staging_sst.imm_ids()
2030 );
2031 assert_eq!(
2032 &dummy_success_upload_output().new_value_ssts,
2033 staging_sst.sstable_infos()
2034 );
2035
2036 match sync_rx.await {
2037 Ok(Ok(data)) => {
2038 let SyncedData {
2039 uploaded_ssts,
2040 table_watermarks,
2041 ..
2042 } = data;
2043 assert_eq!(1, uploaded_ssts.len());
2044 let staging_sst = &uploaded_ssts[0];
2045 assert_eq!(&vec![epoch1], staging_sst.epochs());
2046 assert_eq!(
2047 &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2048 staging_sst.imm_ids()
2049 );
2050 assert_eq!(
2051 &dummy_success_upload_output().new_value_ssts,
2052 staging_sst.sstable_infos()
2053 );
2054 assert!(table_watermarks.is_empty());
2055 }
2056 _ => unreachable!(),
2057 };
2058 assert!(
2059 !uploader
2060 .data()
2061 .unsync_data
2062 .table_data
2063 .contains_key(&TEST_TABLE_ID)
2064 );
2065 }
2066
2067 #[tokio::test]
2068 async fn test_empty_uploader_sync() {
2069 let mut uploader = test_uploader(dummy_success_upload_future);
2070 let epoch1 = INITIAL_EPOCH.next_epoch();
2071
2072 let (sync_tx, sync_rx) = oneshot::channel();
2073 uploader.start_epochs_for_test([epoch1]);
2074 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2075 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2076 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2077 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2078
2079 assert_uploader_pending(&mut uploader).await;
2080
2081 match sync_rx.await {
2082 Ok(Ok(data)) => {
2083 assert!(data.uploaded_ssts.is_empty());
2084 }
2085 _ => unreachable!(),
2086 };
2087 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2088 let new_pinned_version = uploader
2089 .context
2090 .pinned_version
2091 .new_pin_version(test_hummock_version(epoch1))
2092 .unwrap();
2093 uploader.update_pinned_version(new_pinned_version);
2094 assert!(uploader.data().syncing_data.is_empty());
2095 assert_eq!(
2096 epoch1,
2097 uploader
2098 .context
2099 .pinned_version
2100 .table_committed_epoch(TEST_TABLE_ID)
2101 .unwrap()
2102 );
2103 }
2104
2105 #[tokio::test]
2106 async fn test_uploader_empty_epoch() {
2107 let mut uploader = test_uploader(dummy_success_upload_future);
2108 let epoch1 = INITIAL_EPOCH.next_epoch();
2109 let epoch2 = epoch1.next_epoch();
2110 uploader.start_epochs_for_test([epoch1, epoch2]);
2111 let (imm, tracker) = gen_imm_with_unlimit(epoch2);
2112 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2114 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2115 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm, tracker));
2116
2117 let (sync_tx, sync_rx) = oneshot::channel();
2118 uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2119 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2120
2121 assert_uploader_pending(&mut uploader).await;
2122
2123 match sync_rx.await {
2124 Ok(Ok(data)) => {
2125 assert!(data.uploaded_ssts.is_empty());
2126 }
2127 _ => unreachable!(),
2128 };
2129 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2130 let new_pinned_version = uploader
2131 .context
2132 .pinned_version
2133 .new_pin_version(test_hummock_version(epoch1))
2134 .unwrap();
2135 uploader.update_pinned_version(new_pinned_version);
2136 assert!(uploader.data().syncing_data.is_empty());
2137 assert_eq!(
2138 epoch1,
2139 uploader
2140 .context
2141 .pinned_version
2142 .table_committed_epoch(TEST_TABLE_ID)
2143 .unwrap()
2144 );
2145 }
2146
2147 #[tokio::test]
2148 async fn test_uploader_poll_empty() {
2149 let mut uploader = test_uploader(dummy_success_upload_future);
2150 let fut = uploader.next_uploaded_ssts();
2151 let mut fut = pin!(fut);
2152 assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await);
2153 }
2154
2155 #[tokio::test]
2156 async fn test_uploader_empty_advance_mce() {
2157 let mut uploader = test_uploader(dummy_success_upload_future);
2158 let mut sst_collector = UploadedSstCollector::default();
2159 let initial_pinned_version = uploader.context.pinned_version.clone();
2160 let epoch1 = INITIAL_EPOCH.next_epoch();
2161 let epoch2 = epoch1.next_epoch();
2162 let epoch3 = epoch2.next_epoch();
2163 let epoch4 = epoch3.next_epoch();
2164 let epoch5 = epoch4.next_epoch();
2165 let epoch6 = epoch5.next_epoch();
2166 let version1 = initial_pinned_version
2167 .new_pin_version(test_hummock_version(epoch1))
2168 .unwrap();
2169 let version2 = initial_pinned_version
2170 .new_pin_version(test_hummock_version(epoch2))
2171 .unwrap();
2172 let version3 = initial_pinned_version
2173 .new_pin_version(test_hummock_version(epoch3))
2174 .unwrap();
2175 let version4 = initial_pinned_version
2176 .new_pin_version(test_hummock_version(epoch4))
2177 .unwrap();
2178 let version5 = initial_pinned_version
2179 .new_pin_version(test_hummock_version(epoch5))
2180 .unwrap();
2181
2182 uploader.start_epochs_for_test([epoch6]);
2183 uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
2184
2185 uploader.update_pinned_version(version1);
2186 assert_eq!(epoch1, uploader.test_max_synced_epoch());
2187 assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2188
2189 let (imm, tracker) = gen_imm_with_unlimit(epoch6);
2190 uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
2191 uploader.update_pinned_version(version2);
2192 assert_eq!(epoch2, uploader.test_max_synced_epoch());
2193 assert_eq!(epoch2, uploader.test_max_syncing_epoch());
2194
2195 uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6);
2196 uploader.update_pinned_version(version3);
2197 assert_eq!(epoch3, uploader.test_max_synced_epoch());
2198 assert_eq!(epoch3, uploader.test_max_syncing_epoch());
2199
2200 let (sync_tx, sync_rx) = oneshot::channel();
2201 uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2202 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2203 uploader.update_pinned_version(version4);
2204 assert_eq!(epoch4, uploader.test_max_synced_epoch());
2205 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2206
2207 let sst = sst_collector.next(&mut uploader).await;
2208 assert_eq!(&get_imm_ids([&imm]), sst.imm_ids());
2209
2210 match sync_rx.await {
2211 Ok(Ok(data)) => {
2212 assert!(data.table_watermarks.is_empty());
2213 assert_eq!(1, data.uploaded_ssts.len());
2214 assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids());
2215 }
2216 _ => unreachable!(),
2217 }
2218
2219 uploader.update_pinned_version(version5);
2220 assert_eq!(epoch6, uploader.test_max_synced_epoch());
2221 assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2222 }
2223
2224 #[tokio::test]
2225 async fn test_uploader_finish_in_order() {
2226 let config = StorageOpts {
2227 shared_buffer_capacity_mb: 1024 * 1024,
2228 shared_buffer_flush_ratio: 0.0,
2229 ..Default::default()
2230 };
2231 let (buffer_tracker, mut uploader, new_task_notifier) =
2232 prepare_uploader_order_test(&config, false);
2233 let mut sst_collector = UploadedSstCollector::default();
2234
2235 let epoch1 = INITIAL_EPOCH.next_epoch();
2236 let epoch2 = epoch1.next_epoch();
2237 let epoch3 = epoch2.next_epoch();
2238 let epoch4 = epoch3.next_epoch();
2239 uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]);
2240 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2241 let memory_limiter = memory_limiter.as_ref();
2242
2243 let instance_id1 = 1;
2244 let instance_id2 = 2;
2245
2246 uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2247 uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2248
2249 let (imm2, tracker2) = gen_imm_with_limiter(epoch2, memory_limiter).await;
2251 uploader.add_imm(instance_id2, (imm2.clone(), tracker2));
2252 let (imm1_1, tracker1_1) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2253 uploader.add_imm(instance_id1, (imm1_1.clone(), tracker1_1));
2254 let (imm1_2, tracker1_2) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2255 uploader.add_imm(instance_id1, (imm1_2.clone(), tracker1_2));
2256
2257 let epoch1_spill_payload12 =
2259 HashMap::from_iter([(instance_id1, vec![imm1_2.clone(), imm1_1.clone()])]);
2260 let epoch2_spill_payload = HashMap::from_iter([(instance_id2, vec![imm2.clone()])]);
2261 let (await_start1, finish_tx1) =
2262 new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload12));
2263 let (await_start2, finish_tx2) =
2264 new_task_notifier(get_payload_imm_ids(&epoch2_spill_payload));
2265 uploader.may_flush_for_test();
2266 await_start1.await;
2267 await_start2.await;
2268
2269 assert_uploader_pending(&mut uploader).await;
2270
2271 finish_tx2.send(()).unwrap();
2272 assert_uploader_pending(&mut uploader).await;
2273
2274 finish_tx1.send(()).unwrap();
2275 let sst = sst_collector.next(&mut uploader).await;
2276 assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids());
2277 assert_eq!(&vec![epoch1], sst.epochs());
2278
2279 let sst = sst_collector.next(&mut uploader).await;
2280 assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids());
2281 assert_eq!(&vec![epoch2], sst.epochs());
2282
2283 let (imm1_3, tracker1_3) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2284 uploader.add_imm(instance_id1, (imm1_3.clone(), tracker1_3));
2285 let epoch1_spill_payload3 = HashMap::from_iter([(instance_id1, vec![imm1_3.clone()])]);
2286 let (await_start1_3, finish_tx1_3) =
2287 new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload3));
2288 uploader.may_flush_for_test();
2289 await_start1_3.await;
2290 let (imm1_4, tracker1_4) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2291 uploader.add_imm(instance_id1, (imm1_4.clone(), tracker1_4));
2292 let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]);
2293 let (await_start1_4, finish_tx1_4) =
2294 new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload));
2295 uploader.local_seal_epoch_for_test(instance_id1, epoch1);
2296 let (sync_tx1, sync_rx1) = oneshot::channel();
2297 uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID]));
2298 await_start1_4.await;
2299
2300 uploader.local_seal_epoch_for_test(instance_id1, epoch2);
2301 uploader.local_seal_epoch_for_test(instance_id2, epoch2);
2302
2303 let (imm3_1, tracker3_1) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2309 let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]);
2310 uploader.add_imm(instance_id1, (imm3_1.clone(), tracker3_1));
2311 let (await_start3_1, finish_tx3_1) =
2312 new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload1));
2313 uploader.may_flush_for_test();
2314 await_start3_1.await;
2315 let (imm3_2, tracker3_2) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2316 let epoch3_spill_payload2 = HashMap::from_iter([(instance_id2, vec![imm3_2.clone()])]);
2317 uploader.add_imm(instance_id2, (imm3_2.clone(), tracker3_2));
2318 let (await_start3_2, finish_tx3_2) =
2319 new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload2));
2320 uploader.may_flush_for_test();
2321 await_start3_2.await;
2322 let (imm3_3, tracker3_3) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2323 uploader.add_imm(instance_id1, (imm3_3.clone(), tracker3_3));
2324
2325 uploader.local_seal_epoch_for_test(instance_id1, epoch3);
2331 let (imm4, tracker4) = gen_imm_with_limiter(epoch4, memory_limiter).await;
2332 uploader.add_imm(instance_id1, (imm4.clone(), tracker4));
2333 assert_uploader_pending(&mut uploader).await;
2334
2335 finish_tx3_1.send(()).unwrap();
2342 assert_uploader_pending(&mut uploader).await;
2343 finish_tx1_4.send(()).unwrap();
2344 assert_uploader_pending(&mut uploader).await;
2345 finish_tx1_3.send(()).unwrap();
2346
2347 let sst = sst_collector.next(&mut uploader).await;
2348 assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids());
2349
2350 let sst = sst_collector.next(&mut uploader).await;
2351 assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids());
2352
2353 match sync_rx1.await {
2354 Ok(Ok(data)) => {
2355 assert_eq!(3, data.uploaded_ssts.len());
2356 assert_eq!(
2357 &get_payload_imm_ids(&epoch1_sync_payload),
2358 data.uploaded_ssts[0].imm_ids()
2359 );
2360 assert_eq!(
2361 &get_payload_imm_ids(&epoch1_spill_payload3),
2362 data.uploaded_ssts[1].imm_ids()
2363 );
2364 assert_eq!(
2365 &get_payload_imm_ids(&epoch1_spill_payload12),
2366 data.uploaded_ssts[2].imm_ids()
2367 );
2368 }
2369 _ => {
2370 unreachable!()
2371 }
2372 }
2373
2374 let (sync_tx2, sync_rx2) = oneshot::channel();
2382 uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID]));
2383 uploader.local_seal_epoch_for_test(instance_id2, epoch3);
2384 let sst = sst_collector.next(&mut uploader).await;
2385 assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids());
2386
2387 match sync_rx2.await {
2388 Ok(Ok(data)) => {
2389 assert_eq!(data.uploaded_ssts.len(), 1);
2390 assert_eq!(
2391 &get_payload_imm_ids(&epoch2_spill_payload),
2392 data.uploaded_ssts[0].imm_ids()
2393 );
2394 }
2395 _ => {
2396 unreachable!("should be sync finish");
2397 }
2398 }
2399 assert_eq!(epoch2, uploader.test_max_synced_epoch());
2400
2401 uploader.local_seal_epoch_for_test(instance_id1, epoch4);
2409 uploader.local_seal_epoch_for_test(instance_id2, epoch4);
2410 let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]);
2411 let (await_start4_with_3_3, finish_tx4_with_3_3) =
2412 new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload));
2413 let (sync_tx4, mut sync_rx4) = oneshot::channel();
2414 uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID]));
2415 await_start4_with_3_3.await;
2416
2417 assert_uploader_pending(&mut uploader).await;
2425
2426 finish_tx3_2.send(()).unwrap();
2427 let sst = sst_collector.next(&mut uploader).await;
2428 assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids());
2429
2430 finish_tx4_with_3_3.send(()).unwrap();
2431 assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await);
2432
2433 let sst = sst_collector.next(&mut uploader).await;
2434 assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids());
2435
2436 match sync_rx4.await {
2437 Ok(Ok(data)) => {
2438 assert_eq!(3, data.uploaded_ssts.len());
2439 assert_eq!(
2440 &get_payload_imm_ids(&epoch4_sync_payload),
2441 data.uploaded_ssts[0].imm_ids()
2442 );
2443 assert_eq!(
2444 &get_payload_imm_ids(&epoch3_spill_payload2),
2445 data.uploaded_ssts[1].imm_ids()
2446 );
2447 assert_eq!(
2448 &get_payload_imm_ids(&epoch3_spill_payload1),
2449 data.uploaded_ssts[2].imm_ids(),
2450 )
2451 }
2452 _ => {
2453 unreachable!("should be sync finish");
2454 }
2455 }
2456 assert_eq!(epoch4, uploader.test_max_synced_epoch());
2457
2458 }
2466
2467 #[tokio::test]
2468 async fn test_uploader_frequently_flush() {
2469 let config = StorageOpts {
2470 shared_buffer_capacity_mb: 10,
2471 shared_buffer_flush_ratio: 0.12,
2472 shared_buffer_min_batch_flush_size_mb: 1,
2474 ..Default::default()
2475 };
2476 let (buffer_tracker, mut uploader, _new_task_notifier) =
2477 prepare_uploader_order_test(&config, true);
2478
2479 let epoch1 = INITIAL_EPOCH.next_epoch();
2480 let epoch2 = epoch1.next_epoch();
2481 uploader.start_epochs_for_test([epoch1, epoch2]);
2482 let instance_id1 = 1;
2483 let instance_id2 = 2;
2484 let flush_threshold = buffer_tracker.flush_threshold();
2485 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2486 let memory_limiter = memory_limiter.as_ref();
2487
2488 uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2489 uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2490
2491 let mut total_memory = 0;
2493 while total_memory < flush_threshold {
2494 let (imm, tracker) = gen_imm_with_limiter(epoch2, memory_limiter).await;
2495 total_memory += imm.size();
2496 if total_memory > flush_threshold {
2497 break;
2498 }
2499 uploader.add_imm(instance_id2, (imm, tracker));
2500 }
2501 let (imm, tracker) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2502 uploader.add_imm(instance_id1, (imm, tracker));
2503 assert!(uploader.may_flush_for_test());
2504
2505 for _ in 0..10 {
2506 let (imm, tracker) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2507 uploader.add_imm(instance_id1, (imm, tracker));
2508 assert!(!uploader.may_flush_for_test());
2509 }
2510 }
2511}