risingwave_storage/hummock/event_handler/uploader/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod spiller;
16mod task_manager;
17pub(crate) mod test_utils;
18
19use std::cmp::Ordering;
20use std::collections::btree_map::Entry;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque, hash_map};
22use std::fmt::{Debug, Display, Formatter};
23use std::future::{Future, poll_fn};
24use std::mem::{replace, swap, take};
25use std::sync::Arc;
26use std::task::{Context, Poll, ready};
27
28use futures::FutureExt;
29use itertools::Itertools;
30use more_asserts::assert_gt;
31use prometheus::{Histogram, HistogramTimer, IntGauge};
32use risingwave_common::bitmap::BitmapBuilder;
33use risingwave_common::catalog::TableId;
34use risingwave_common::metrics::UintGauge;
35use risingwave_common::must_match;
36use risingwave_hummock_sdk::table_watermark::{
37    TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
40use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId, LocalSstableInfo};
41use task_manager::{TaskManager, UploadingTaskStatus};
42use thiserror_ext::AsReport;
43use tokio::sync::oneshot;
44use tokio::task::JoinHandle;
45use tracing::{debug, error, info, warn};
46
47use crate::hummock::event_handler::LocalInstanceId;
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, send_sync_result};
49use crate::hummock::event_handler::uploader::spiller::Spiller;
50use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm;
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchId;
53use crate::hummock::store::version::StagingSstableInfo;
54use crate::hummock::utils::MemoryTracker;
55use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
56use crate::mem_table::ImmId;
57use crate::monitor::HummockStateStoreMetrics;
58use crate::store::SealCurrentEpochOptions;
59
60/// Take epoch data inclusively before `epoch` out from `data`
61fn take_before_epoch<T>(
62    data: &mut BTreeMap<HummockEpoch, T>,
63    epoch: HummockEpoch,
64) -> BTreeMap<HummockEpoch, T> {
65    let mut before_epoch_data = data.split_off(&(epoch + 1));
66    swap(&mut before_epoch_data, data);
67    before_epoch_data
68}
69
70type UploadTaskInput = HashMap<LocalInstanceId, Vec<UploaderImm>>;
71pub type UploadTaskPayload = HashMap<LocalInstanceId, Vec<ImmutableMemtable>>;
72
73#[derive(Debug)]
74pub struct UploadTaskOutput {
75    pub new_value_ssts: Vec<LocalSstableInfo>,
76    pub old_value_ssts: Vec<LocalSstableInfo>,
77    pub wait_poll_timer: Option<HistogramTimer>,
78}
79pub type SpawnUploadTask = Arc<
80    dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle<HummockResult<UploadTaskOutput>>
81        + Send
82        + Sync
83        + 'static,
84>;
85
86#[derive(Clone)]
87pub struct UploadTaskInfo {
88    pub task_size: usize,
89    pub epochs: Vec<HummockEpoch>,
90    pub table_ids: HashSet<TableId>,
91    pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
92}
93
94impl Display for UploadTaskInfo {
95    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96        f.debug_struct("UploadTaskInfo")
97            .field("task_size", &self.task_size)
98            .field("epochs", &self.epochs)
99            .field("len(imm_ids)", &self.imm_ids.len())
100            .finish()
101    }
102}
103
104impl Debug for UploadTaskInfo {
105    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("UploadTaskInfo")
107            .field("task_size", &self.task_size)
108            .field("epochs", &self.epochs)
109            .field("imm_ids", &self.imm_ids)
110            .finish()
111    }
112}
113
114mod uploader_imm {
115    use std::fmt::Formatter;
116    use std::ops::Deref;
117
118    use risingwave_common::metrics::{LabelGuardedIntGauge, UintGauge};
119
120    #[cfg(test)]
121    use crate::hummock::MemoryLimiter;
122    use crate::hummock::event_handler::uploader::UploaderContext;
123    use crate::hummock::utils::MemoryTracker;
124    use crate::mem_table::ImmutableMemtable;
125
126    pub(super) struct UploaderImm {
127        inner: ImmutableMemtable,
128        size_guard: UintGauge,
129        per_table_size_guard: LabelGuardedIntGauge,
130        per_table_count_guard: LabelGuardedIntGauge,
131        _tracker: MemoryTracker,
132    }
133
134    impl UploaderImm {
135        pub(super) fn new(
136            imm: ImmutableMemtable,
137            context: &UploaderContext,
138            tracker: MemoryTracker,
139        ) -> Self {
140            let size = imm.size();
141            let table_id_str = imm.table_id().to_string();
142            let size_guard = context.stats.uploader_imm_size.clone();
143            let per_table_size_guard = context
144                .stats
145                .uploader_per_table_imm_size
146                .with_guarded_label_values(&[&table_id_str]);
147            let per_table_count_guard = context
148                .stats
149                .uploader_per_table_imm_count
150                .with_guarded_label_values(&[&table_id_str]);
151            size_guard.add(size as _);
152            per_table_size_guard.add(size as _);
153            per_table_count_guard.add(1 as _);
154            Self {
155                inner: imm,
156                size_guard,
157                per_table_size_guard,
158                per_table_count_guard,
159                _tracker: tracker,
160            }
161        }
162
163        #[cfg(test)]
164        pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
165            Self {
166                inner: imm,
167                size_guard: UintGauge::new("test", "test").unwrap(),
168                per_table_size_guard: LabelGuardedIntGauge::test_int_gauge::<1>(),
169                per_table_count_guard: LabelGuardedIntGauge::test_int_gauge::<1>(),
170                _tracker: MemoryLimiter::unlimit().try_require_memory(1).unwrap(),
171            }
172        }
173    }
174
175    impl std::fmt::Debug for UploaderImm {
176        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
177            self.inner.fmt(f)
178        }
179    }
180
181    impl Deref for UploaderImm {
182        type Target = ImmutableMemtable;
183
184        fn deref(&self) -> &Self::Target {
185            &self.inner
186        }
187    }
188
189    impl Drop for UploaderImm {
190        fn drop(&mut self) {
191            self.size_guard.sub(self.inner.size() as _);
192            self.per_table_size_guard.sub(self.inner.size() as _);
193            self.per_table_count_guard.dec();
194        }
195    }
196}
197
198#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
199struct UploadingTaskId(usize);
200
201/// A wrapper for a uploading task that compacts and uploads the imm payload. Task context are
202/// stored so that when the task fails, it can be re-tried.
203struct UploadingTask {
204    task_id: UploadingTaskId,
205    // newer data at the front
206    input: UploadTaskInput,
207    join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
208    task_info: UploadTaskInfo,
209    spawn_upload_task: SpawnUploadTask,
210    task_size_guard: UintGauge,
211    task_count_guard: IntGauge,
212}
213
214impl Drop for UploadingTask {
215    fn drop(&mut self) {
216        self.task_size_guard.sub(self.task_info.task_size as u64);
217        self.task_count_guard.dec();
218    }
219}
220
221impl Debug for UploadingTask {
222    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
223        f.debug_struct("UploadingTask")
224            .field("input", &self.input)
225            .field("task_info", &self.task_info)
226            .finish()
227    }
228}
229
230fn get_payload_imm_ids(
231    payload: &UploadTaskPayload,
232) -> HashMap<LocalInstanceId, Vec<SharedBufferBatchId>> {
233    payload
234        .iter()
235        .map(|(instance_id, imms)| {
236            (
237                *instance_id,
238                imms.iter().map(|imm| imm.batch_id()).collect_vec(),
239            )
240        })
241        .collect()
242}
243
244impl UploadingTask {
245    // INFO logs will be enabled for task with size exceeding 50MB.
246    const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);
247
248    fn input_to_payload(input: &UploadTaskInput) -> UploadTaskPayload {
249        input
250            .iter()
251            .map(|(instance_id, imms)| {
252                (
253                    *instance_id,
254                    imms.iter().map(|imm| (**imm).clone()).collect(),
255                )
256            })
257            .collect()
258    }
259
260    fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
261        assert!(!input.is_empty());
262        let mut epochs = input
263            .iter()
264            .flat_map(|(_, imms)| imms.iter().flat_map(|imm| imm.epochs().iter().cloned()))
265            .sorted()
266            .dedup()
267            .collect_vec();
268
269        // reverse to make newer epochs comes first
270        epochs.reverse();
271        let payload = Self::input_to_payload(&input);
272        let imm_ids = get_payload_imm_ids(&payload);
273        let task_size = input
274            .values()
275            .map(|imms| imms.iter().map(|imm| imm.size()).sum::<usize>())
276            .sum();
277        let task_info = UploadTaskInfo {
278            task_size,
279            epochs,
280            table_ids: payload
281                .values()
282                .flat_map(|imms| imms.iter().map(|imm| imm.table_id()))
283                .collect(),
284            imm_ids,
285        };
286        context
287            .buffer_tracker
288            .global_upload_task_size()
289            .add(task_size as u64);
290        if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
291            info!("start upload task: {:?}", task_info);
292        } else {
293            debug!("start upload task: {:?}", task_info);
294        }
295        let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
296        context.stats.uploader_uploading_task_count.inc();
297        Self {
298            task_id,
299            input,
300            join_handle,
301            task_info,
302            spawn_upload_task: context.spawn_upload_task.clone(),
303            task_size_guard: context.buffer_tracker.global_upload_task_size().clone(),
304            task_count_guard: context.stats.uploader_uploading_task_count.clone(),
305        }
306    }
307
308    /// Poll the result of the uploading task
309    fn poll_result(
310        &mut self,
311        cx: &mut Context<'_>,
312    ) -> Poll<HummockResult<Arc<StagingSstableInfo>>> {
313        Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
314            Ok(task_result) => task_result
315                .inspect(|_| {
316                    if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
317                        info!(task_info = ?self.task_info, "upload task finish");
318                    } else {
319                        debug!(task_info = ?self.task_info, "upload task finish");
320                    }
321                })
322                .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
323                .map(|output| {
324                    Arc::new(StagingSstableInfo::new(
325                        output.new_value_ssts,
326                        output.old_value_ssts,
327                        self.task_info.epochs.clone(),
328                        self.task_info.imm_ids.clone(),
329                        self.task_info.task_size,
330                    ))
331                }),
332
333            Err(err) => Err(HummockError::other(format!(
334                "fail to join upload join handle: {}",
335                err.as_report()
336            ))),
337        })
338    }
339
340    /// Poll the uploading task until it succeeds. If it fails, we will retry it.
341    fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll<Arc<StagingSstableInfo>> {
342        loop {
343            let result = ready!(self.poll_result(cx));
344            match result {
345                Ok(sstables) => return Poll::Ready(sstables),
346                Err(e) => {
347                    error!(
348                        error = %e.as_report(),
349                        task_info = ?self.task_info,
350                        "a flush task failed, start retry",
351                    );
352                    self.join_handle = (self.spawn_upload_task)(
353                        Self::input_to_payload(&self.input),
354                        self.task_info.clone(),
355                    );
356                    // It is important not to return Poll::pending here immediately, because the new
357                    // join_handle is not polled yet, and will not awake the current task when
358                    // succeed. It will be polled in the next loop iteration.
359                }
360            }
361        }
362    }
363
364    pub fn get_task_info(&self) -> &UploadTaskInfo {
365        &self.task_info
366    }
367}
368
369impl TableUnsyncData {
370    fn add_table_watermarks(
371        &mut self,
372        epoch: HummockEpoch,
373        table_watermarks: Vec<VnodeWatermark>,
374        direction: WatermarkDirection,
375        watermark_type: WatermarkSerdeType,
376    ) {
377        if table_watermarks.is_empty() {
378            return;
379        }
380        let vnode_count = table_watermarks[0].vnode_count();
381        for watermark in &table_watermarks {
382            assert_eq!(vnode_count, watermark.vnode_count());
383        }
384
385        fn apply_new_vnodes(
386            vnode_bitmap: &mut BitmapBuilder,
387            vnode_watermarks: &Vec<VnodeWatermark>,
388        ) {
389            for vnode_watermark in vnode_watermarks {
390                for vnode in vnode_watermark.vnode_bitmap().iter_ones() {
391                    assert!(
392                        !vnode_bitmap.is_set(vnode),
393                        "vnode {} write multiple table watermarks",
394                        vnode
395                    );
396                    vnode_bitmap.set(vnode, true);
397                }
398            }
399        }
400        match &mut self.table_watermarks {
401            Some((prev_direction, prev_watermarks, prev_watermark_type)) => {
402                assert_eq!(
403                    *prev_direction, direction,
404                    "table id {} new watermark direction not match with previous",
405                    self.table_id
406                );
407                assert_eq!(
408                    *prev_watermark_type, watermark_type,
409                    "table id {} new watermark watermark_type not match with previous",
410                    self.table_id
411                );
412                match prev_watermarks.entry(epoch) {
413                    Entry::Occupied(mut entry) => {
414                        let (prev_watermarks, vnode_bitmap) = entry.get_mut();
415                        apply_new_vnodes(vnode_bitmap, &table_watermarks);
416                        prev_watermarks.extend(table_watermarks);
417                    }
418                    Entry::Vacant(entry) => {
419                        let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
420                        apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
421                        entry.insert((table_watermarks, vnode_bitmap));
422                    }
423                }
424            }
425            None => {
426                let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
427                apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
428                self.table_watermarks = Some((
429                    direction,
430                    BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]),
431                    watermark_type,
432                ));
433            }
434        }
435    }
436}
437
438impl UploaderData {
439    fn add_table_watermarks(
440        all_table_watermarks: &mut HashMap<TableId, TableWatermarks>,
441        table_id: TableId,
442        direction: WatermarkDirection,
443        watermarks: impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)>,
444        watermark_type: WatermarkSerdeType,
445    ) {
446        let mut table_watermarks: Option<TableWatermarks> = None;
447        for (epoch, watermarks) in watermarks {
448            match &mut table_watermarks {
449                Some(prev_watermarks) => {
450                    assert_eq!(prev_watermarks.direction, direction);
451                    assert_eq!(prev_watermarks.watermark_type, watermark_type);
452                    prev_watermarks.add_new_epoch_watermarks(
453                        epoch,
454                        Arc::from(watermarks),
455                        direction,
456                        watermark_type,
457                    );
458                }
459                None => {
460                    table_watermarks = Some(TableWatermarks::single_epoch(
461                        epoch,
462                        watermarks,
463                        direction,
464                        watermark_type,
465                    ));
466                }
467            }
468        }
469        if let Some(table_watermarks) = table_watermarks {
470            assert!(
471                all_table_watermarks
472                    .insert(table_id, table_watermarks)
473                    .is_none()
474            );
475        }
476    }
477}
478
479struct LocalInstanceEpochData {
480    epoch: HummockEpoch,
481    // newer data comes first.
482    imms: VecDeque<UploaderImm>,
483    has_spilled: bool,
484}
485
486impl LocalInstanceEpochData {
487    fn new(epoch: HummockEpoch) -> Self {
488        Self {
489            epoch,
490            imms: VecDeque::new(),
491            has_spilled: false,
492        }
493    }
494
495    fn epoch(&self) -> HummockEpoch {
496        self.epoch
497    }
498
499    fn add_imm(&mut self, imm: UploaderImm) {
500        assert_eq!(imm.max_epoch(), imm.min_epoch());
501        assert_eq!(self.epoch, imm.min_epoch());
502        if let Some(prev_imm) = self.imms.front() {
503            assert_gt!(imm.batch_id(), prev_imm.batch_id());
504        }
505        self.imms.push_front(imm);
506    }
507
508    fn is_empty(&self) -> bool {
509        self.imms.is_empty()
510    }
511}
512
513struct LocalInstanceUnsyncData {
514    table_id: TableId,
515    instance_id: LocalInstanceId,
516    // None means that the current instance should have stopped advancing
517    current_epoch_data: Option<LocalInstanceEpochData>,
518    // newer data comes first.
519    sealed_data: VecDeque<LocalInstanceEpochData>,
520    // newer data comes first
521    flushing_imms: VecDeque<SharedBufferBatchId>,
522    is_destroyed: bool,
523    is_stopped: bool,
524}
525
526impl LocalInstanceUnsyncData {
527    fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self {
528        Self {
529            table_id,
530            instance_id,
531            current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
532            sealed_data: VecDeque::new(),
533            flushing_imms: Default::default(),
534            is_destroyed: false,
535            is_stopped: false,
536        }
537    }
538
539    fn add_imm(&mut self, imm: UploaderImm) {
540        assert!(!self.is_destroyed);
541        assert!(!self.is_stopped);
542        assert_eq!(self.table_id, imm.table_id);
543        self.current_epoch_data
544            .as_mut()
545            .expect("should be Some when adding new imm")
546            .add_imm(imm);
547    }
548
549    fn local_seal_epoch(&mut self, next_epoch: HummockEpoch, stopped: bool) -> HummockEpoch {
550        let data = self
551            .current_epoch_data
552            .as_mut()
553            .expect("should be Some when seal new epoch");
554        let current_epoch = data.epoch;
555        debug!(
556            instance_id = self.instance_id,
557            next_epoch, current_epoch, stopped, "local seal epoch"
558        );
559        assert_gt!(next_epoch, current_epoch);
560        assert!(!self.is_destroyed);
561        assert!(!self.is_stopped);
562        self.is_stopped = stopped;
563        let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch));
564        if !epoch_data.is_empty() {
565            self.sealed_data.push_front(epoch_data);
566        }
567        current_epoch
568    }
569
570    // imm_ids from old to new, which means in ascending order
571    fn ack_flushed(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
572        for imm_id in imm_ids {
573            assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id);
574        }
575    }
576
577    fn spill(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
578        let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() {
579            match oldest_sealed_epoch.epoch.cmp(&epoch) {
580                Ordering::Less => {
581                    unreachable!(
582                        "should not spill at this epoch because there \
583                    is unspilled data in previous epoch: prev epoch {}, spill epoch {}",
584                        oldest_sealed_epoch.epoch, epoch
585                    );
586                }
587                Ordering::Equal => {
588                    let epoch_data = self.sealed_data.pop_back().unwrap();
589                    assert_eq!(epoch, epoch_data.epoch);
590                    epoch_data.imms
591                }
592                Ordering::Greater => VecDeque::new(),
593            }
594        } else {
595            let Some(current_epoch_data) = &mut self.current_epoch_data else {
596                return Vec::new();
597            };
598            match current_epoch_data.epoch.cmp(&epoch) {
599                Ordering::Less => {
600                    assert!(
601                        current_epoch_data.imms.is_empty(),
602                        "should not spill at this epoch because there \
603                    is unspilled data in current epoch epoch {}, spill epoch {}",
604                        current_epoch_data.epoch,
605                        epoch
606                    );
607                    VecDeque::new()
608                }
609                Ordering::Equal => {
610                    if !current_epoch_data.imms.is_empty() {
611                        current_epoch_data.has_spilled = true;
612                        take(&mut current_epoch_data.imms)
613                    } else {
614                        VecDeque::new()
615                    }
616                }
617                Ordering::Greater => VecDeque::new(),
618            }
619        };
620        self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
621        imms.into_iter().collect()
622    }
623
624    fn add_flushing_imm(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
625        for imm_id in imm_ids {
626            if let Some(prev_imm_id) = self.flushing_imms.front() {
627                assert_gt!(imm_id, *prev_imm_id);
628            }
629            self.flushing_imms.push_front(imm_id);
630        }
631    }
632
633    // start syncing the imm inclusively before the `epoch`
634    // returning data with newer data coming first
635    fn sync(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
636        // firstly added from old to new
637        let mut ret = Vec::new();
638        while let Some(epoch_data) = self.sealed_data.back()
639            && epoch_data.epoch() <= epoch
640        {
641            let imms = self.sealed_data.pop_back().expect("checked exist").imms;
642            self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
643            ret.extend(imms.into_iter().rev());
644        }
645        // reverse so that newer data comes first
646        ret.reverse();
647        if let Some(latest_epoch_data) = &self.current_epoch_data
648            && latest_epoch_data.epoch <= epoch
649        {
650            assert!(self.sealed_data.is_empty());
651            assert!(latest_epoch_data.is_empty());
652            assert!(!latest_epoch_data.has_spilled);
653            if cfg!(debug_assertions) {
654                panic!(
655                    "sync epoch exceeds latest epoch, and the current instance should have been archived, table_id = {}, latest_epoch_data = {}, epoch = {}",
656                    self.table_id,
657                    latest_epoch_data.epoch(),
658                    epoch
659                );
660            }
661            warn!(
662                instance_id = self.instance_id,
663                table_id = %self.table_id,
664                "sync epoch exceeds latest epoch, and the current instance should have be archived"
665            );
666            self.current_epoch_data = None;
667        }
668        ret
669    }
670
671    fn assert_after_epoch(&self, epoch: HummockEpoch) {
672        if let Some(oldest_sealed_data) = self.sealed_data.back() {
673            assert!(!oldest_sealed_data.imms.is_empty());
674            assert_gt!(oldest_sealed_data.epoch, epoch);
675        } else if let Some(current_data) = &self.current_epoch_data
676            && current_data.epoch <= epoch
677        {
678            assert!(current_data.imms.is_empty() && !current_data.has_spilled);
679        }
680    }
681
682    fn is_finished(&self) -> bool {
683        self.is_destroyed && self.sealed_data.is_empty()
684    }
685}
686
687struct TableUnsyncData {
688    table_id: TableId,
689    instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
690    #[expect(clippy::type_complexity)]
691    table_watermarks: Option<(
692        WatermarkDirection,
693        BTreeMap<HummockEpoch, (Vec<VnodeWatermark>, BitmapBuilder)>,
694        WatermarkSerdeType,
695    )>,
696    spill_tasks: BTreeMap<HummockEpoch, VecDeque<UploadingTaskId>>,
697    unsync_epochs: BTreeMap<HummockEpoch, ()>,
698    // newer epoch at the front
699    syncing_epochs: VecDeque<HummockEpoch>,
700    max_synced_epoch: Option<HummockEpoch>,
701}
702
703impl TableUnsyncData {
704    fn new(table_id: TableId, committed_epoch: Option<HummockEpoch>) -> Self {
705        Self {
706            table_id,
707            instance_data: Default::default(),
708            table_watermarks: None,
709            spill_tasks: Default::default(),
710            unsync_epochs: Default::default(),
711            syncing_epochs: Default::default(),
712            max_synced_epoch: committed_epoch,
713        }
714    }
715
716    fn new_epoch(&mut self, epoch: HummockEpoch) {
717        debug!(table_id = ?self.table_id, epoch, "table new epoch");
718        if let Some(latest_epoch) = self.max_epoch() {
719            assert_gt!(epoch, latest_epoch);
720        }
721        self.unsync_epochs.insert(epoch, ());
722    }
723
724    #[expect(clippy::type_complexity)]
725    fn sync(
726        &mut self,
727        epoch: HummockEpoch,
728    ) -> (
729        impl Iterator<Item = (LocalInstanceId, Vec<UploaderImm>)> + '_,
730        Option<(
731            WatermarkDirection,
732            impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)> + use<>,
733            WatermarkSerdeType,
734        )>,
735        impl Iterator<Item = UploadingTaskId> + use<>,
736        BTreeMap<HummockEpoch, ()>,
737    ) {
738        if let Some(prev_epoch) = self.max_sync_epoch() {
739            assert_gt!(epoch, prev_epoch)
740        }
741        let epochs = take_before_epoch(&mut self.unsync_epochs, epoch);
742        assert_eq!(
743            *epochs.last_key_value().expect("non-empty").0,
744            epoch,
745            "{epochs:?} {epoch} {:?}",
746            self.table_id
747        );
748        self.syncing_epochs.push_front(epoch);
749        (
750            self.instance_data
751                .iter_mut()
752                .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))),
753            self.table_watermarks
754                .as_mut()
755                .map(|(direction, watermarks, watermark_type)| {
756                    let watermarks = take_before_epoch(watermarks, epoch)
757                        .into_iter()
758                        .map(|(epoch, (watermarks, _))| (epoch, watermarks));
759                    (*direction, watermarks, *watermark_type)
760                }),
761            take_before_epoch(&mut self.spill_tasks, epoch)
762                .into_values()
763                .flat_map(|tasks| tasks.into_iter()),
764            epochs,
765        )
766    }
767
768    fn ack_synced(&mut self, sync_epoch: HummockEpoch) {
769        let min_sync_epoch = self.syncing_epochs.pop_back().expect("should exist");
770        assert_eq!(sync_epoch, min_sync_epoch);
771        self.max_synced_epoch = Some(sync_epoch);
772    }
773
774    fn ack_committed(&mut self, committed_epoch: HummockEpoch) {
775        let synced_epoch_advanced = {
776            if let Some(max_synced_epoch) = self.max_synced_epoch
777                && max_synced_epoch >= committed_epoch
778            {
779                false
780            } else {
781                true
782            }
783        };
784        if synced_epoch_advanced {
785            self.max_synced_epoch = Some(committed_epoch);
786            if let Some(min_syncing_epoch) = self.syncing_epochs.back() {
787                assert_gt!(*min_syncing_epoch, committed_epoch);
788            }
789            self.assert_after_epoch(committed_epoch);
790        }
791    }
792
793    fn assert_after_epoch(&self, epoch: HummockEpoch) {
794        self.instance_data
795            .values()
796            .for_each(|instance_data| instance_data.assert_after_epoch(epoch));
797        if let Some((_, watermarks, _)) = &self.table_watermarks
798            && let Some((oldest_epoch, _)) = watermarks.first_key_value()
799        {
800            assert_gt!(*oldest_epoch, epoch);
801        }
802    }
803
804    fn max_sync_epoch(&self) -> Option<HummockEpoch> {
805        self.syncing_epochs
806            .front()
807            .cloned()
808            .or(self.max_synced_epoch)
809    }
810
811    fn max_epoch(&self) -> Option<HummockEpoch> {
812        self.unsync_epochs
813            .last_key_value()
814            .map(|(epoch, _)| *epoch)
815            .or_else(|| self.max_sync_epoch())
816    }
817
818    fn is_empty(&self) -> bool {
819        self.instance_data.is_empty()
820            && self.syncing_epochs.is_empty()
821            && self.unsync_epochs.is_empty()
822    }
823}
824
825#[derive(Eq, Hash, PartialEq, Copy, Clone)]
826struct UnsyncEpochId(HummockEpoch, TableId);
827
828impl UnsyncEpochId {
829    fn epoch(&self) -> HummockEpoch {
830        self.0
831    }
832}
833
834fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet<TableId>) -> Option<UnsyncEpochId> {
835    table_ids
836        .iter()
837        .min()
838        .map(|table_id| UnsyncEpochId(epoch, *table_id))
839}
840
841#[derive(Default)]
842/// Unsync data, can be either imm or spilled sst, and some aggregated epoch information.
843///
844/// `instance_data` holds the imm of each individual local instance, and data are first added here.
845/// The aggregated epoch information (table watermarks, etc.) and the spilled sst will be added to `epoch_data`.
846struct UnsyncData {
847    table_data: HashMap<TableId, TableUnsyncData>,
848    // An index as a mapping from instance id to its table id
849    instance_table_id: HashMap<LocalInstanceId, TableId>,
850    unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
851    spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
852}
853
854impl UnsyncData {
855    fn init_instance(
856        &mut self,
857        table_id: TableId,
858        instance_id: LocalInstanceId,
859        init_epoch: HummockEpoch,
860    ) {
861        debug!(
862            table_id = %table_id,
863            instance_id, init_epoch, "init epoch"
864        );
865        let table_data = self
866            .table_data
867            .get_mut(&table_id)
868            .unwrap_or_else(|| panic!("should exist. {table_id:?}"));
869        assert!(
870            table_data
871                .instance_data
872                .insert(
873                    instance_id,
874                    LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch)
875                )
876                .is_none()
877        );
878        assert!(
879            self.instance_table_id
880                .insert(instance_id, table_id)
881                .is_none()
882        );
883        assert!(table_data.unsync_epochs.contains_key(&init_epoch));
884    }
885
886    fn instance_data(
887        &mut self,
888        instance_id: LocalInstanceId,
889    ) -> Option<&mut LocalInstanceUnsyncData> {
890        self.instance_table_id
891            .get_mut(&instance_id)
892            .cloned()
893            .map(move |table_id| {
894                self.table_data
895                    .get_mut(&table_id)
896                    .expect("should exist")
897                    .instance_data
898                    .get_mut(&instance_id)
899                    .expect("should exist")
900            })
901    }
902
903    fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) {
904        self.instance_data(instance_id)
905            .expect("should exist")
906            .add_imm(imm);
907    }
908
909    fn local_seal_epoch(
910        &mut self,
911        instance_id: LocalInstanceId,
912        next_epoch: HummockEpoch,
913        opts: SealCurrentEpochOptions,
914    ) {
915        let table_id = self.instance_table_id[&instance_id];
916        let table_data = self.table_data.get_mut(&table_id).expect("should exist");
917        let instance_data = table_data
918            .instance_data
919            .get_mut(&instance_id)
920            .expect("should exist");
921        // When drop/cancel a streaming job, for the barrier to stop actor, the
922        // local instance will call `local_seal_epoch`, but the `next_epoch` won't be
923        // called `start_epoch` because we have stopped writing on it.
924        let stopped = !table_data.unsync_epochs.contains_key(&next_epoch);
925        let epoch = instance_data.local_seal_epoch(next_epoch, stopped);
926        if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks {
927            table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type);
928        }
929    }
930
931    fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
932        if let Some(table_id) = self.instance_table_id.get(&instance_id) {
933            debug!(instance_id, "destroy instance");
934            let table_data = self.table_data.get_mut(table_id).expect("should exist");
935            let instance_data = table_data
936                .instance_data
937                .get_mut(&instance_id)
938                .expect("should exist");
939            assert!(
940                !instance_data.is_destroyed,
941                "cannot destroy an instance for twice"
942            );
943            instance_data.is_destroyed = true;
944        }
945    }
946
947    fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
948        for table_id in table_ids {
949            if let Some(table_unsync_data) = self.table_data.remove(table_id) {
950                for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
951                    if let Some(task_status) = task_manager.abort_task(task_id) {
952                        must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
953                            assert!(spill_table_ids.is_subset(table_ids));
954                        });
955                    }
956                    if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
957                        assert!(spill_table_ids.is_subset(table_ids));
958                    }
959                }
960                assert!(
961                    table_unsync_data
962                        .instance_data
963                        .values()
964                        .all(|instance| instance.is_destroyed),
965                    "should be clear when dropping the read version instance"
966                );
967                for instance_id in table_unsync_data.instance_data.keys() {
968                    assert_eq!(
969                        *table_id,
970                        self.instance_table_id
971                            .remove(instance_id)
972                            .expect("should exist")
973                    );
974                }
975            }
976        }
977        debug_assert!(
978            self.spilled_data
979                .values()
980                .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))
981        );
982        self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
983            if !unsync_epoch_table_ids.is_disjoint(table_ids) {
984                assert!(unsync_epoch_table_ids.is_subset(table_ids));
985                false
986            } else {
987                true
988            }
989        });
990        assert!(
991            self.instance_table_id
992                .values()
993                .all(|table_id| !table_ids.contains(table_id))
994        );
995    }
996}
997
998impl UploaderData {
999    fn sync(
1000        &mut self,
1001        context: &UploaderContext,
1002        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1003        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1004    ) {
1005        let mut all_table_watermarks = HashMap::new();
1006        let mut uploading_tasks = HashSet::new();
1007        let mut spilled_tasks = BTreeSet::new();
1008        let mut all_table_ids = HashSet::new();
1009        let mut vector_index_adds = HashMap::new();
1010
1011        let mut flush_payload = HashMap::new();
1012
1013        for (epoch, table_ids) in &sync_table_epochs {
1014            let epoch = *epoch;
1015            for table_id in table_ids {
1016                assert!(
1017                    all_table_ids.insert(*table_id),
1018                    "duplicate sync table epoch: {:?} {:?}",
1019                    all_table_ids,
1020                    sync_table_epochs
1021                );
1022            }
1023            if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) {
1024                let min_table_id_data = self
1025                    .unsync_data
1026                    .table_data
1027                    .get_mut(&min_table_id)
1028                    .expect("should exist");
1029                let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch);
1030                for epoch in epochs.keys() {
1031                    assert_eq!(
1032                        &self
1033                            .unsync_data
1034                            .unsync_epochs
1035                            .remove(&UnsyncEpochId(*epoch, min_table_id))
1036                            .expect("should exist"),
1037                        table_ids
1038                    );
1039                }
1040                for table_id in table_ids {
1041                    let table_data = self
1042                        .unsync_data
1043                        .table_data
1044                        .get_mut(table_id)
1045                        .expect("should exist");
1046                    let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) =
1047                        table_data.sync(epoch);
1048                    assert_eq!(table_unsync_epochs, epochs, "invalid table_id {table_id}");
1049                    for (instance_id, payload) in unflushed_payload {
1050                        if !payload.is_empty() {
1051                            flush_payload.insert(instance_id, payload);
1052                        }
1053                    }
1054                    table_data.instance_data.retain(|instance_id, data| {
1055                        // remove the finished instances
1056                        if data.is_finished() {
1057                            assert_eq!(
1058                                self.unsync_data.instance_table_id.remove(instance_id),
1059                                Some(*table_id)
1060                            );
1061                            false
1062                        } else {
1063                            true
1064                        }
1065                    });
1066                    if let Some((direction, watermarks, watermark_type)) = table_watermarks {
1067                        Self::add_table_watermarks(
1068                            &mut all_table_watermarks,
1069                            *table_id,
1070                            direction,
1071                            watermarks,
1072                            watermark_type,
1073                        );
1074                    }
1075                    for task_id in task_ids {
1076                        if self.unsync_data.spilled_data.contains_key(&task_id) {
1077                            spilled_tasks.insert(task_id);
1078                        } else {
1079                            uploading_tasks.insert(task_id);
1080                        }
1081                    }
1082
1083                    if let hash_map::Entry::Occupied(mut entry) =
1084                        self.unsync_vector_index_data.entry(*table_id)
1085                    {
1086                        let data = entry.get_mut();
1087                        let adds = take_before_epoch(&mut data.sealed_epoch_data, epoch)
1088                            .into_values()
1089                            .flatten()
1090                            .collect_vec();
1091                        if data.is_dropped && data.sealed_epoch_data.is_empty() {
1092                            entry.remove();
1093                        }
1094                        if !adds.is_empty() {
1095                            vector_index_adds
1096                                .try_insert(*table_id, adds)
1097                                .expect("non-duplicate");
1098                        }
1099                    }
1100                }
1101            }
1102        }
1103
1104        let sync_id = {
1105            let sync_id = self.next_sync_id;
1106            self.next_sync_id += 1;
1107            SyncId(sync_id)
1108        };
1109
1110        if let Some(extra_flush_task_id) = self.task_manager.sync(
1111            context,
1112            sync_id,
1113            flush_payload,
1114            uploading_tasks.iter().cloned(),
1115            &all_table_ids,
1116        ) {
1117            uploading_tasks.insert(extra_flush_task_id);
1118        }
1119
1120        // iter from large task_id to small one so that newer data at the front
1121        let uploaded = spilled_tasks
1122            .iter()
1123            .rev()
1124            .map(|task_id| {
1125                let (sst, spill_table_ids) = self
1126                    .unsync_data
1127                    .spilled_data
1128                    .remove(task_id)
1129                    .expect("should exist");
1130                assert!(
1131                    spill_table_ids.is_subset(&all_table_ids),
1132                    "spilled tabled ids {:?} not a subset of sync table id {:?}",
1133                    spill_table_ids,
1134                    all_table_ids
1135                );
1136                sst
1137            })
1138            .collect();
1139
1140        self.syncing_data.insert(
1141            sync_id,
1142            SyncingData {
1143                sync_table_epochs,
1144                remaining_uploading_tasks: uploading_tasks,
1145                uploaded,
1146                table_watermarks: all_table_watermarks,
1147                vector_index_adds,
1148                sync_result_sender,
1149            },
1150        );
1151
1152        self.check_upload_task_consistency();
1153    }
1154}
1155
1156impl UnsyncData {
1157    fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
1158        for (instance_id, imm_ids) in sstable_info.imm_ids() {
1159            if let Some(instance_data) = self.instance_data(*instance_id) {
1160                // take `rev` to let old imm id goes first
1161                instance_data.ack_flushed(imm_ids.iter().rev().cloned());
1162            }
1163        }
1164    }
1165}
1166
1167struct SyncingData {
1168    sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1169    remaining_uploading_tasks: HashSet<UploadingTaskId>,
1170    // newer data at the front
1171    uploaded: VecDeque<Arc<StagingSstableInfo>>,
1172    table_watermarks: HashMap<TableId, TableWatermarks>,
1173    vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1174    sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1175}
1176
1177#[derive(Debug)]
1178pub struct SyncedData {
1179    pub uploaded_ssts: VecDeque<Arc<StagingSstableInfo>>,
1180    pub table_watermarks: HashMap<TableId, TableWatermarks>,
1181    pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1182}
1183
1184struct UploaderContext {
1185    pinned_version: PinnedVersion,
1186    /// When called, it will spawn a task to flush the imm into sst and return the join handle.
1187    spawn_upload_task: SpawnUploadTask,
1188    buffer_tracker: BufferTracker,
1189
1190    stats: Arc<HummockStateStoreMetrics>,
1191}
1192
1193impl UploaderContext {
1194    fn new(
1195        pinned_version: PinnedVersion,
1196        spawn_upload_task: SpawnUploadTask,
1197        buffer_tracker: BufferTracker,
1198        stats: Arc<HummockStateStoreMetrics>,
1199    ) -> Self {
1200        UploaderContext {
1201            pinned_version,
1202            spawn_upload_task,
1203            buffer_tracker,
1204            stats,
1205        }
1206    }
1207}
1208
1209#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
1210struct SyncId(usize);
1211
1212struct UnsyncVectorIndexData {
1213    sealed_epoch_data: BTreeMap<HummockEpoch, Option<VectorIndexAdd>>,
1214    curr_epoch: u64,
1215    is_dropped: bool,
1216}
1217
1218#[derive(Default)]
1219struct UploaderData {
1220    unsync_data: UnsyncData,
1221    unsync_vector_index_data: HashMap<TableId, UnsyncVectorIndexData>,
1222
1223    syncing_data: BTreeMap<SyncId, SyncingData>,
1224
1225    task_manager: TaskManager,
1226    next_sync_id: usize,
1227}
1228
1229impl UploaderData {
1230    fn abort(self, err: impl Fn() -> HummockError) {
1231        self.task_manager.abort_all_tasks();
1232        for syncing_data in self.syncing_data.into_values() {
1233            send_sync_result(syncing_data.sync_result_sender, Err(err()));
1234        }
1235    }
1236
1237    fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
1238        if table_ids.is_empty() {
1239            return;
1240        }
1241        self.unsync_data
1242            .clear_tables(&table_ids, &mut self.task_manager);
1243        self.syncing_data.retain(|sync_id, syncing_data| {
1244            if syncing_data
1245                .sync_table_epochs
1246                .iter()
1247                .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids))
1248            {
1249                assert!(
1250                    syncing_data
1251                        .sync_table_epochs
1252                        .iter()
1253                        .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))
1254                );
1255                for task_id in &syncing_data.remaining_uploading_tasks {
1256                    match self
1257                        .task_manager
1258                        .abort_task(*task_id)
1259                        .expect("should exist")
1260                    {
1261                        UploadingTaskStatus::Spilling(spill_table_ids) => {
1262                            assert!(spill_table_ids.is_subset(&table_ids));
1263                        }
1264                        UploadingTaskStatus::Sync(task_sync_id) => {
1265                            assert_eq!(sync_id, &task_sync_id);
1266                        }
1267                    }
1268                }
1269                false
1270            } else {
1271                true
1272            }
1273        });
1274
1275        self.check_upload_task_consistency();
1276    }
1277
1278    fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1279        self.unsync_data
1280            .spilled_data
1281            .values()
1282            .map(|(s, _)| s)
1283            .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
1284            .filter_map(|s| {
1285                s.sstable_infos()
1286                    .iter()
1287                    .chain(s.old_value_sstable_infos())
1288                    .map(|s| s.sst_info.object_id)
1289                    .min()
1290            })
1291            .min()
1292            .map(|object_id| object_id.as_raw())
1293    }
1294}
1295
1296struct ErrState {
1297    failed_sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1298    reason: String,
1299}
1300
1301enum UploaderState {
1302    Working(UploaderData),
1303    Err(ErrState),
1304}
1305
1306/// An uploader for hummock data.
1307///
1308/// Data have 3 sequential stages: unsync (inside each local instance, data can be unsealed, sealed), syncing, synced.
1309///
1310/// The 3 stages are divided by 2 marginal epochs: `max_syncing_epoch`,
1311/// `max_synced_epoch` in each `TableUnSyncData`. Epochs satisfy the following inequality.
1312///
1313/// (epochs of `synced_data`) <= `max_synced_epoch` < (epochs of `syncing_data`) <=
1314/// `max_syncing_epoch` < (epochs of `unsync_data`)
1315///
1316/// Data are mostly stored in `VecDeque`, and the order stored in the `VecDeque` indicates the data
1317/// order. Data at the front represents ***newer*** data.
1318pub struct HummockUploader {
1319    state: UploaderState,
1320
1321    context: UploaderContext,
1322}
1323
1324impl HummockUploader {
1325    pub(super) fn new(
1326        state_store_metrics: Arc<HummockStateStoreMetrics>,
1327        pinned_version: PinnedVersion,
1328        spawn_upload_task: SpawnUploadTask,
1329        buffer_tracker: BufferTracker,
1330    ) -> Self {
1331        Self {
1332            state: UploaderState::Working(UploaderData::default()),
1333            context: UploaderContext::new(
1334                pinned_version,
1335                spawn_upload_task,
1336                buffer_tracker,
1337                state_store_metrics,
1338            ),
1339        }
1340    }
1341
1342    pub(super) fn buffer_tracker(&self) -> &BufferTracker {
1343        &self.context.buffer_tracker
1344    }
1345
1346    pub(super) fn hummock_version(&self) -> &PinnedVersion {
1347        &self.context.pinned_version
1348    }
1349
1350    pub(super) fn add_imms(
1351        &mut self,
1352        instance_id: LocalInstanceId,
1353        imms: Vec<(ImmutableMemtable, MemoryTracker)>,
1354    ) {
1355        let UploaderState::Working(data) = &mut self.state else {
1356            return;
1357        };
1358        for (imm, tracker) in imms {
1359            let imm = UploaderImm::new(imm, &self.context, tracker);
1360            data.unsync_data.add_imm(instance_id, imm);
1361        }
1362    }
1363
1364    pub(super) fn init_instance(
1365        &mut self,
1366        instance_id: LocalInstanceId,
1367        table_id: TableId,
1368        init_epoch: HummockEpoch,
1369    ) {
1370        let UploaderState::Working(data) = &mut self.state else {
1371            return;
1372        };
1373        data.unsync_data
1374            .init_instance(table_id, instance_id, init_epoch);
1375    }
1376
1377    pub(super) fn local_seal_epoch(
1378        &mut self,
1379        instance_id: LocalInstanceId,
1380        next_epoch: HummockEpoch,
1381        opts: SealCurrentEpochOptions,
1382    ) {
1383        let UploaderState::Working(data) = &mut self.state else {
1384            return;
1385        };
1386        data.unsync_data
1387            .local_seal_epoch(instance_id, next_epoch, opts);
1388    }
1389
1390    pub(super) fn register_vector_writer(&mut self, table_id: TableId, init_epoch: HummockEpoch) {
1391        let UploaderState::Working(data) = &mut self.state else {
1392            return;
1393        };
1394        assert!(
1395            data.unsync_vector_index_data
1396                .try_insert(
1397                    table_id,
1398                    UnsyncVectorIndexData {
1399                        sealed_epoch_data: Default::default(),
1400                        curr_epoch: init_epoch,
1401                        is_dropped: false,
1402                    }
1403                )
1404                .is_ok(),
1405            "duplicate vector writer on {}",
1406            table_id
1407        )
1408    }
1409
1410    pub(super) fn vector_writer_seal_epoch(
1411        &mut self,
1412        table_id: TableId,
1413        next_epoch: HummockEpoch,
1414        add: Option<VectorIndexAdd>,
1415    ) {
1416        let UploaderState::Working(data) = &mut self.state else {
1417            return;
1418        };
1419        let data = data
1420            .unsync_vector_index_data
1421            .get_mut(&table_id)
1422            .expect("should exist");
1423        assert!(!data.is_dropped);
1424        assert!(
1425            data.curr_epoch < next_epoch,
1426            "next epoch {} should be greater than current epoch {}",
1427            next_epoch,
1428            data.curr_epoch
1429        );
1430        data.sealed_epoch_data
1431            .try_insert(data.curr_epoch, add)
1432            .expect("non-duplicate");
1433        data.curr_epoch = next_epoch;
1434    }
1435
1436    pub(super) fn drop_vector_writer(&mut self, table_id: TableId) {
1437        let UploaderState::Working(data) = &mut self.state else {
1438            return;
1439        };
1440        let hash_map::Entry::Occupied(mut entry) = data.unsync_vector_index_data.entry(table_id)
1441        else {
1442            panic!("vector writer {} should exist", table_id);
1443        };
1444        let data = entry.get_mut();
1445        data.is_dropped = true;
1446        if data.sealed_epoch_data.is_empty() {
1447            entry.remove();
1448        }
1449    }
1450
1451    pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
1452        let UploaderState::Working(data) = &mut self.state else {
1453            return;
1454        };
1455        debug!(epoch, ?table_ids, "start epoch");
1456        for table_id in &table_ids {
1457            let table_data = data
1458                .unsync_data
1459                .table_data
1460                .entry(*table_id)
1461                .or_insert_with(|| {
1462                    TableUnsyncData::new(
1463                        *table_id,
1464                        self.context.pinned_version.table_committed_epoch(*table_id),
1465                    )
1466                });
1467            table_data.new_epoch(epoch);
1468        }
1469        if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) {
1470            assert!(
1471                data.unsync_data
1472                    .unsync_epochs
1473                    .insert(unsync_epoch_id, table_ids)
1474                    .is_none()
1475            );
1476        }
1477    }
1478
1479    pub(super) fn start_sync_epoch(
1480        &mut self,
1481        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1482        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1483    ) {
1484        let data = match &mut self.state {
1485            UploaderState::Working(data) => data,
1486            UploaderState::Err(ErrState {
1487                failed_sync_table_epochs,
1488                reason,
1489            }) => {
1490                let result = Err(HummockError::other(format!(
1491                    "previous sync epoch {:?} failed due to [{}]",
1492                    failed_sync_table_epochs, reason
1493                )));
1494                send_sync_result(sync_result_sender, result);
1495                return;
1496            }
1497        };
1498        debug!(?sync_table_epochs, "start sync epoch");
1499
1500        data.sync(&self.context, sync_result_sender, sync_table_epochs);
1501
1502        data.may_notify_sync_task(&self.context);
1503
1504        self.context
1505            .stats
1506            .uploader_syncing_epoch_count
1507            .set(data.syncing_data.len() as _);
1508    }
1509
1510    pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) {
1511        if let UploaderState::Working(data) = &mut self.state {
1512            // TODO: may only `ack_committed` on table whose `committed_epoch` is changed.
1513            for (table_id, info) in pinned_version.state_table_info.info() {
1514                if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) {
1515                    table_data.ack_committed(info.committed_epoch);
1516                }
1517            }
1518        }
1519        self.context.pinned_version = pinned_version;
1520    }
1521
1522    pub(crate) fn may_flush(&mut self, spiller_latency: &Histogram) -> bool {
1523        let UploaderState::Working(data) = &mut self.state else {
1524            return false;
1525        };
1526        if self.context.buffer_tracker.need_flush() {
1527            let timer = spiller_latency.start_timer();
1528            let mut spiller = Spiller::new(&mut data.unsync_data);
1529            let mut curr_batch_flush_size = 0;
1530            // iterate from older epoch to newer epoch
1531            while self
1532                .context
1533                .buffer_tracker
1534                .need_more_flush(curr_batch_flush_size)
1535                && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload()
1536            {
1537                assert!(!payload.is_empty());
1538                {
1539                    let (task_id, task_size, spilled_table_ids) =
1540                        data.task_manager
1541                            .spill(&self.context, spilled_table_ids, payload);
1542                    for table_id in spilled_table_ids {
1543                        spiller
1544                            .unsync_data()
1545                            .table_data
1546                            .get_mut(table_id)
1547                            .expect("should exist")
1548                            .spill_tasks
1549                            .entry(epoch)
1550                            .or_default()
1551                            .push_front(task_id);
1552                    }
1553                    curr_batch_flush_size += task_size;
1554                }
1555            }
1556            timer.observe_duration();
1557            data.check_upload_task_consistency();
1558            curr_batch_flush_size > 0
1559        } else {
1560            false
1561        }
1562    }
1563
1564    pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
1565        if let Some(table_ids) = table_ids {
1566            if let UploaderState::Working(data) = &mut self.state {
1567                data.clear_tables(table_ids);
1568            }
1569        } else {
1570            if let UploaderState::Working(data) = replace(
1571                &mut self.state,
1572                UploaderState::Working(UploaderData::default()),
1573            ) {
1574                data.abort(|| HummockError::other("uploader is reset"));
1575            }
1576
1577            self.context.stats.uploader_syncing_epoch_count.set(0);
1578        }
1579    }
1580
1581    pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
1582        let UploaderState::Working(data) = &mut self.state else {
1583            return;
1584        };
1585        data.unsync_data.may_destroy_instance(instance_id);
1586    }
1587
1588    pub(crate) fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1589        if let UploaderState::Working(ref u) = self.state {
1590            u.min_uncommitted_object_id()
1591        } else {
1592            None
1593        }
1594    }
1595}
1596
1597impl UploaderData {
1598    fn may_notify_sync_task(&mut self, context: &UploaderContext) {
1599        while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
1600            && syncing_data.remaining_uploading_tasks.is_empty()
1601        {
1602            let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty");
1603            let SyncingData {
1604                sync_table_epochs,
1605                remaining_uploading_tasks: _,
1606                uploaded,
1607                table_watermarks,
1608                vector_index_adds,
1609                sync_result_sender,
1610            } = syncing_data;
1611            context
1612                .stats
1613                .uploader_syncing_epoch_count
1614                .set(self.syncing_data.len() as _);
1615
1616            for (sync_epoch, table_ids) in sync_table_epochs {
1617                for table_id in table_ids {
1618                    if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) {
1619                        table_data.ack_synced(sync_epoch);
1620                        if table_data.is_empty() {
1621                            self.unsync_data.table_data.remove(&table_id);
1622                        }
1623                    }
1624                }
1625            }
1626
1627            send_sync_result(
1628                sync_result_sender,
1629                Ok(SyncedData {
1630                    uploaded_ssts: uploaded,
1631                    table_watermarks,
1632                    vector_index_adds,
1633                }),
1634            )
1635        }
1636    }
1637
1638    fn check_upload_task_consistency(&self) {
1639        #[cfg(debug_assertions)]
1640        {
1641            let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new();
1642            for table_data in self.unsync_data.table_data.values() {
1643                for task_id in table_data
1644                    .spill_tasks
1645                    .iter()
1646                    .flat_map(|(_, tasks)| tasks.iter())
1647                {
1648                    assert!(
1649                        spill_task_table_id_from_data
1650                            .entry(*task_id)
1651                            .or_default()
1652                            .insert(table_data.table_id)
1653                    );
1654                }
1655            }
1656            let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self
1657                .syncing_data
1658                .iter()
1659                .filter_map(|(sync_id, data)| {
1660                    if data.remaining_uploading_tasks.is_empty() {
1661                        None
1662                    } else {
1663                        Some((*sync_id, data.remaining_uploading_tasks.clone()))
1664                    }
1665                })
1666                .collect();
1667
1668            let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1669            for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
1670                spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
1671            }
1672            let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1673            for (task_id, status) in self.task_manager.tasks() {
1674                match status {
1675                    UploadingTaskStatus::Spilling(table_ids) => {
1676                        assert!(
1677                            spill_task_table_id_from_manager
1678                                .insert(task_id, table_ids.clone())
1679                                .is_none()
1680                        );
1681                    }
1682                    UploadingTaskStatus::Sync(sync_id) => {
1683                        assert!(
1684                            syncing_task_from_manager
1685                                .entry(*sync_id)
1686                                .or_default()
1687                                .insert(task_id)
1688                        );
1689                    }
1690                }
1691            }
1692            assert_eq!(
1693                spill_task_table_id_from_data,
1694                spill_task_table_id_from_manager
1695            );
1696            assert_eq!(syncing_task_id_from_data, syncing_task_from_manager);
1697        }
1698    }
1699}
1700
1701impl HummockUploader {
1702    pub(super) fn next_uploaded_ssts(
1703        &mut self,
1704    ) -> impl Future<Output = Vec<Arc<StagingSstableInfo>>> + '_ {
1705        poll_fn(|cx| {
1706            let UploaderState::Working(data) = &mut self.state else {
1707                return Poll::Pending;
1708            };
1709
1710            const MAX_BATCH_SIZE: usize = 16;
1711
1712            let mut ssts = None;
1713            while let Poll::Ready(Some((task_id, status, result))) =
1714                data.task_manager.poll_task_result(cx)
1715            {
1716                match result {
1717                    Ok(sst) => {
1718                        data.unsync_data.ack_flushed(&sst);
1719                        match status {
1720                            UploadingTaskStatus::Sync(sync_id) => {
1721                                let syncing_data =
1722                                    data.syncing_data.get_mut(&sync_id).expect("should exist");
1723                                syncing_data.uploaded.push_front(sst.clone());
1724                                assert!(syncing_data.remaining_uploading_tasks.remove(&task_id));
1725                                data.may_notify_sync_task(&self.context);
1726                            }
1727                            UploadingTaskStatus::Spilling(table_ids) => {
1728                                data.unsync_data
1729                                    .spilled_data
1730                                    .insert(task_id, (sst.clone(), table_ids));
1731                            }
1732                        }
1733                        data.check_upload_task_consistency();
1734                        let ssts = ssts.get_or_insert_with(|| Vec::with_capacity(MAX_BATCH_SIZE));
1735                        ssts.push(sst);
1736                        if ssts.len() >= MAX_BATCH_SIZE {
1737                            break;
1738                        }
1739                    }
1740                    Err((sync_id, e)) => {
1741                        let syncing_data =
1742                            data.syncing_data.remove(&sync_id).expect("should exist");
1743                        let failed_epochs = syncing_data.sync_table_epochs.clone();
1744                        let data = must_match!(replace(
1745                            &mut self.state,
1746                            UploaderState::Err(ErrState {
1747                                failed_sync_table_epochs: syncing_data.sync_table_epochs,
1748                                reason: e.as_report().to_string(),
1749                            }),
1750                        ), UploaderState::Working(data) => data);
1751
1752                        let _ = syncing_data
1753                            .sync_result_sender
1754                            .send(Err(HummockError::other(format!(
1755                                "failed to sync: {:?}",
1756                                e.as_report()
1757                            ))));
1758
1759                        data.abort(|| {
1760                            HummockError::other(format!(
1761                                "previous epoch {:?} failed to sync",
1762                                failed_epochs
1763                            ))
1764                        });
1765                        return Poll::Pending;
1766                    }
1767                }
1768            }
1769            if let Some(ssts) = ssts {
1770                Poll::Ready(ssts)
1771            } else {
1772                Poll::Pending
1773            }
1774        })
1775    }
1776}
1777
1778#[cfg(test)]
1779pub(crate) mod tests {
1780    use std::collections::{HashMap, HashSet};
1781    use std::future::{Future, poll_fn};
1782    use std::pin::pin;
1783    use std::sync::Arc;
1784    use std::sync::atomic::AtomicUsize;
1785    use std::sync::atomic::Ordering::SeqCst;
1786    use std::task::Poll;
1787
1788    use futures::FutureExt;
1789    use risingwave_common::catalog::TableId;
1790    use risingwave_common::metrics::LabelGuardedHistogram;
1791    use risingwave_common::util::epoch::EpochExt;
1792    use risingwave_hummock_sdk::HummockEpoch;
1793    use risingwave_hummock_sdk::vector_index::{
1794        FlatIndexAdd, VectorFileInfo, VectorIndexAdd, VectorStoreInfoDelta,
1795    };
1796    use tokio::sync::oneshot;
1797
1798    use super::test_utils::*;
1799    use crate::hummock::event_handler::uploader::{
1800        HummockUploader, SyncedData, UploadingTask, get_payload_imm_ids,
1801    };
1802    use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID};
1803    use crate::hummock::utils::MemoryTracker;
1804    use crate::hummock::{HummockError, HummockResult};
1805    use crate::mem_table::ImmutableMemtable;
1806    use crate::opts::StorageOpts;
1807
1808    impl HummockUploader {
1809        pub(super) fn add_imm(
1810            &mut self,
1811            instance_id: LocalInstanceId,
1812            imm: (ImmutableMemtable, MemoryTracker),
1813        ) {
1814            self.add_imms(instance_id, vec![imm]);
1815        }
1816
1817        pub(super) fn start_single_epoch_sync(
1818            &mut self,
1819            epoch: HummockEpoch,
1820            sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1821            table_ids: HashSet<TableId>,
1822        ) {
1823            self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]);
1824        }
1825
1826        pub(super) fn may_flush_for_test(&mut self) -> bool {
1827            self.may_flush(&LabelGuardedHistogram::test_histogram::<1>())
1828        }
1829    }
1830
1831    #[tokio::test]
1832    pub async fn test_uploading_task_future() {
1833        let uploader_context = test_uploader_context(dummy_success_upload_future);
1834
1835        let imm = gen_imm(INITIAL_EPOCH);
1836        let imm_size = imm.size();
1837        let imm_ids = get_imm_ids(vec![&imm]);
1838        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1839        assert_eq!(imm_size, task.task_info.task_size);
1840        assert_eq!(imm_ids, task.task_info.imm_ids);
1841        assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs);
1842        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1843        assert_eq!(
1844            output.sstable_infos(),
1845            &dummy_success_upload_output().new_value_ssts
1846        );
1847        assert_eq!(imm_size, output.imm_size());
1848        assert_eq!(&imm_ids, output.imm_ids());
1849        assert_eq!(&vec![INITIAL_EPOCH], output.epochs());
1850
1851        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1852        let imm = gen_imm(INITIAL_EPOCH);
1853        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1854        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1855    }
1856
1857    #[tokio::test]
1858    pub async fn test_uploading_task_poll_result() {
1859        let uploader_context = test_uploader_context(dummy_success_upload_future);
1860        let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1861        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1862        assert_eq!(
1863            output.sstable_infos(),
1864            &dummy_success_upload_output().new_value_ssts
1865        );
1866
1867        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1868        let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1869        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1870    }
1871
1872    #[tokio::test]
1873    async fn test_uploading_task_poll_ok_with_retry() {
1874        let run_count = Arc::new(AtomicUsize::new(0));
1875        let fail_num = 10;
1876        let run_count_clone = run_count.clone();
1877        let uploader_context = test_uploader_context(move |payload, info| {
1878            let run_count = run_count.clone();
1879            async move {
1880                // fail in the first `fail_num` run, and success at the end
1881                let ret = if run_count.load(SeqCst) < fail_num {
1882                    Err(HummockError::other("fail"))
1883                } else {
1884                    dummy_success_upload_future(payload, info).await
1885                };
1886                run_count.fetch_add(1, SeqCst);
1887                ret
1888            }
1889        });
1890        let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1891        let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await;
1892        assert_eq!(fail_num + 1, run_count_clone.load(SeqCst));
1893        assert_eq!(
1894            output.sstable_infos(),
1895            &dummy_success_upload_output().new_value_ssts
1896        );
1897    }
1898
1899    #[tokio::test]
1900    async fn test_uploader_basic() {
1901        let mut uploader = test_uploader(dummy_success_upload_future);
1902        let mut sst_collector = UploadedSstCollector::default();
1903        let epoch1 = INITIAL_EPOCH.next_epoch();
1904        const VECTOR_INDEX_TABLE_ID: TableId = TableId::new(234);
1905        uploader.start_epoch(
1906            epoch1,
1907            HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1908        );
1909        let (imm, tracker) = gen_imm_with_unlimit(epoch1);
1910        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1911        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
1912        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1913
1914        uploader.register_vector_writer(VECTOR_INDEX_TABLE_ID, epoch1);
1915        let vector_info_file = VectorFileInfo {
1916            object_id: 1.into(),
1917            vector_count: 1,
1918            file_size: 0,
1919            start_vector_id: 0,
1920            meta_offset: 20,
1921        };
1922        let vector_index_add = VectorIndexAdd::Flat(FlatIndexAdd {
1923            vector_store_info_delta: VectorStoreInfoDelta {
1924                next_vector_id: 1,
1925                added_vector_files: vec![vector_info_file.clone()],
1926            },
1927        });
1928        uploader.vector_writer_seal_epoch(
1929            VECTOR_INDEX_TABLE_ID,
1930            epoch1.next_epoch(),
1931            Some(vector_index_add.clone()),
1932        );
1933
1934        let (sync_tx, sync_rx) = oneshot::channel();
1935        uploader.start_single_epoch_sync(
1936            epoch1,
1937            sync_tx,
1938            HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1939        );
1940        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1941        assert_eq!(1, uploader.data().syncing_data.len());
1942        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1943        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1944        assert!(syncing_data.uploaded.is_empty());
1945        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1946
1947        let staging_sst = sst_collector.next(&mut uploader).await;
1948        assert_eq!(&vec![epoch1], staging_sst.epochs());
1949        assert_eq!(
1950            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1951            staging_sst.imm_ids()
1952        );
1953        assert_eq!(
1954            &dummy_success_upload_output().new_value_ssts,
1955            staging_sst.sstable_infos()
1956        );
1957
1958        match sync_rx.await {
1959            Ok(Ok(data)) => {
1960                let SyncedData {
1961                    uploaded_ssts,
1962                    table_watermarks,
1963                    vector_index_adds,
1964                } = data;
1965                assert_eq!(1, uploaded_ssts.len());
1966                let staging_sst = &uploaded_ssts[0];
1967                assert_eq!(&vec![epoch1], staging_sst.epochs());
1968                assert_eq!(
1969                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1970                    staging_sst.imm_ids()
1971                );
1972                assert_eq!(
1973                    &dummy_success_upload_output().new_value_ssts,
1974                    staging_sst.sstable_infos()
1975                );
1976                assert!(table_watermarks.is_empty());
1977                assert_eq!(vector_index_adds.len(), 1);
1978                let (table_id, vector_index_adds) = vector_index_adds.into_iter().next().unwrap();
1979                assert_eq!(table_id, VECTOR_INDEX_TABLE_ID);
1980                assert_eq!(vector_index_adds.len(), 1);
1981                let synced_vector_index_add = vector_index_adds[0].clone();
1982                assert_eq!(vector_index_add, synced_vector_index_add);
1983            }
1984            _ => unreachable!(),
1985        };
1986        assert_eq!(epoch1, uploader.test_max_synced_epoch());
1987
1988        let new_pinned_version = uploader
1989            .context
1990            .pinned_version
1991            .new_pin_version(test_hummock_version(epoch1))
1992            .unwrap();
1993        uploader.update_pinned_version(new_pinned_version);
1994        assert_eq!(
1995            epoch1,
1996            uploader
1997                .context
1998                .pinned_version
1999                .table_committed_epoch(TEST_TABLE_ID)
2000                .unwrap()
2001        );
2002    }
2003
2004    #[tokio::test]
2005    async fn test_uploader_destroy_instance_before_sync() {
2006        let mut uploader = test_uploader(dummy_success_upload_future);
2007        let mut sst_collector = UploadedSstCollector::default();
2008        let epoch1 = INITIAL_EPOCH.next_epoch();
2009        uploader.start_epochs_for_test([epoch1]);
2010        let (imm, tracker) = gen_imm_with_unlimit(epoch1);
2011        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2012        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
2013        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2014        uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);
2015
2016        let (sync_tx, sync_rx) = oneshot::channel();
2017        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2018        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
2019        assert_eq!(1, uploader.data().syncing_data.len());
2020        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
2021        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
2022        assert!(syncing_data.uploaded.is_empty());
2023        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
2024
2025        let staging_sst = sst_collector.next(&mut uploader).await;
2026        assert_eq!(&vec![epoch1], staging_sst.epochs());
2027        assert_eq!(
2028            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2029            staging_sst.imm_ids()
2030        );
2031        assert_eq!(
2032            &dummy_success_upload_output().new_value_ssts,
2033            staging_sst.sstable_infos()
2034        );
2035
2036        match sync_rx.await {
2037            Ok(Ok(data)) => {
2038                let SyncedData {
2039                    uploaded_ssts,
2040                    table_watermarks,
2041                    ..
2042                } = data;
2043                assert_eq!(1, uploaded_ssts.len());
2044                let staging_sst = &uploaded_ssts[0];
2045                assert_eq!(&vec![epoch1], staging_sst.epochs());
2046                assert_eq!(
2047                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2048                    staging_sst.imm_ids()
2049                );
2050                assert_eq!(
2051                    &dummy_success_upload_output().new_value_ssts,
2052                    staging_sst.sstable_infos()
2053                );
2054                assert!(table_watermarks.is_empty());
2055            }
2056            _ => unreachable!(),
2057        };
2058        assert!(
2059            !uploader
2060                .data()
2061                .unsync_data
2062                .table_data
2063                .contains_key(&TEST_TABLE_ID)
2064        );
2065    }
2066
2067    #[tokio::test]
2068    async fn test_empty_uploader_sync() {
2069        let mut uploader = test_uploader(dummy_success_upload_future);
2070        let epoch1 = INITIAL_EPOCH.next_epoch();
2071
2072        let (sync_tx, sync_rx) = oneshot::channel();
2073        uploader.start_epochs_for_test([epoch1]);
2074        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2075        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2076        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2077        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2078
2079        assert_uploader_pending(&mut uploader).await;
2080
2081        match sync_rx.await {
2082            Ok(Ok(data)) => {
2083                assert!(data.uploaded_ssts.is_empty());
2084            }
2085            _ => unreachable!(),
2086        };
2087        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2088        let new_pinned_version = uploader
2089            .context
2090            .pinned_version
2091            .new_pin_version(test_hummock_version(epoch1))
2092            .unwrap();
2093        uploader.update_pinned_version(new_pinned_version);
2094        assert!(uploader.data().syncing_data.is_empty());
2095        assert_eq!(
2096            epoch1,
2097            uploader
2098                .context
2099                .pinned_version
2100                .table_committed_epoch(TEST_TABLE_ID)
2101                .unwrap()
2102        );
2103    }
2104
2105    #[tokio::test]
2106    async fn test_uploader_empty_epoch() {
2107        let mut uploader = test_uploader(dummy_success_upload_future);
2108        let epoch1 = INITIAL_EPOCH.next_epoch();
2109        let epoch2 = epoch1.next_epoch();
2110        uploader.start_epochs_for_test([epoch1, epoch2]);
2111        let (imm, tracker) = gen_imm_with_unlimit(epoch2);
2112        // epoch1 is empty while epoch2 is not. Going to seal empty epoch1.
2113        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2114        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2115        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm, tracker));
2116
2117        let (sync_tx, sync_rx) = oneshot::channel();
2118        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2119        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2120
2121        assert_uploader_pending(&mut uploader).await;
2122
2123        match sync_rx.await {
2124            Ok(Ok(data)) => {
2125                assert!(data.uploaded_ssts.is_empty());
2126            }
2127            _ => unreachable!(),
2128        };
2129        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2130        let new_pinned_version = uploader
2131            .context
2132            .pinned_version
2133            .new_pin_version(test_hummock_version(epoch1))
2134            .unwrap();
2135        uploader.update_pinned_version(new_pinned_version);
2136        assert!(uploader.data().syncing_data.is_empty());
2137        assert_eq!(
2138            epoch1,
2139            uploader
2140                .context
2141                .pinned_version
2142                .table_committed_epoch(TEST_TABLE_ID)
2143                .unwrap()
2144        );
2145    }
2146
2147    #[tokio::test]
2148    async fn test_uploader_poll_empty() {
2149        let mut uploader = test_uploader(dummy_success_upload_future);
2150        let fut = uploader.next_uploaded_ssts();
2151        let mut fut = pin!(fut);
2152        assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await);
2153    }
2154
2155    #[tokio::test]
2156    async fn test_uploader_empty_advance_mce() {
2157        let mut uploader = test_uploader(dummy_success_upload_future);
2158        let mut sst_collector = UploadedSstCollector::default();
2159        let initial_pinned_version = uploader.context.pinned_version.clone();
2160        let epoch1 = INITIAL_EPOCH.next_epoch();
2161        let epoch2 = epoch1.next_epoch();
2162        let epoch3 = epoch2.next_epoch();
2163        let epoch4 = epoch3.next_epoch();
2164        let epoch5 = epoch4.next_epoch();
2165        let epoch6 = epoch5.next_epoch();
2166        let version1 = initial_pinned_version
2167            .new_pin_version(test_hummock_version(epoch1))
2168            .unwrap();
2169        let version2 = initial_pinned_version
2170            .new_pin_version(test_hummock_version(epoch2))
2171            .unwrap();
2172        let version3 = initial_pinned_version
2173            .new_pin_version(test_hummock_version(epoch3))
2174            .unwrap();
2175        let version4 = initial_pinned_version
2176            .new_pin_version(test_hummock_version(epoch4))
2177            .unwrap();
2178        let version5 = initial_pinned_version
2179            .new_pin_version(test_hummock_version(epoch5))
2180            .unwrap();
2181
2182        uploader.start_epochs_for_test([epoch6]);
2183        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
2184
2185        uploader.update_pinned_version(version1);
2186        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2187        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2188
2189        let (imm, tracker) = gen_imm_with_unlimit(epoch6);
2190        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
2191        uploader.update_pinned_version(version2);
2192        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2193        assert_eq!(epoch2, uploader.test_max_syncing_epoch());
2194
2195        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6);
2196        uploader.update_pinned_version(version3);
2197        assert_eq!(epoch3, uploader.test_max_synced_epoch());
2198        assert_eq!(epoch3, uploader.test_max_syncing_epoch());
2199
2200        let (sync_tx, sync_rx) = oneshot::channel();
2201        uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2202        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2203        uploader.update_pinned_version(version4);
2204        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2205        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2206
2207        let sst = sst_collector.next(&mut uploader).await;
2208        assert_eq!(&get_imm_ids([&imm]), sst.imm_ids());
2209
2210        match sync_rx.await {
2211            Ok(Ok(data)) => {
2212                assert!(data.table_watermarks.is_empty());
2213                assert_eq!(1, data.uploaded_ssts.len());
2214                assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids());
2215            }
2216            _ => unreachable!(),
2217        }
2218
2219        uploader.update_pinned_version(version5);
2220        assert_eq!(epoch6, uploader.test_max_synced_epoch());
2221        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2222    }
2223
2224    #[tokio::test]
2225    async fn test_uploader_finish_in_order() {
2226        let config = StorageOpts {
2227            shared_buffer_capacity_mb: 1024 * 1024,
2228            shared_buffer_flush_ratio: 0.0,
2229            ..Default::default()
2230        };
2231        let (buffer_tracker, mut uploader, new_task_notifier) =
2232            prepare_uploader_order_test(&config, false);
2233        let mut sst_collector = UploadedSstCollector::default();
2234
2235        let epoch1 = INITIAL_EPOCH.next_epoch();
2236        let epoch2 = epoch1.next_epoch();
2237        let epoch3 = epoch2.next_epoch();
2238        let epoch4 = epoch3.next_epoch();
2239        uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]);
2240        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2241        let memory_limiter = memory_limiter.as_ref();
2242
2243        let instance_id1 = 1;
2244        let instance_id2 = 2;
2245
2246        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2247        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2248
2249        // imm2 contains data in newer epoch, but added first
2250        let (imm2, tracker2) = gen_imm_with_limiter(epoch2, memory_limiter).await;
2251        uploader.add_imm(instance_id2, (imm2.clone(), tracker2));
2252        let (imm1_1, tracker1_1) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2253        uploader.add_imm(instance_id1, (imm1_1.clone(), tracker1_1));
2254        let (imm1_2, tracker1_2) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2255        uploader.add_imm(instance_id1, (imm1_2.clone(), tracker1_2));
2256
2257        // imm1 will be spilled first
2258        let epoch1_spill_payload12 =
2259            HashMap::from_iter([(instance_id1, vec![imm1_2.clone(), imm1_1.clone()])]);
2260        let epoch2_spill_payload = HashMap::from_iter([(instance_id2, vec![imm2.clone()])]);
2261        let (await_start1, finish_tx1) =
2262            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload12));
2263        let (await_start2, finish_tx2) =
2264            new_task_notifier(get_payload_imm_ids(&epoch2_spill_payload));
2265        uploader.may_flush_for_test();
2266        await_start1.await;
2267        await_start2.await;
2268
2269        assert_uploader_pending(&mut uploader).await;
2270
2271        finish_tx2.send(()).unwrap();
2272        assert_uploader_pending(&mut uploader).await;
2273
2274        finish_tx1.send(()).unwrap();
2275        let sst = sst_collector.next(&mut uploader).await;
2276        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids());
2277        assert_eq!(&vec![epoch1], sst.epochs());
2278
2279        let sst = sst_collector.next(&mut uploader).await;
2280        assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids());
2281        assert_eq!(&vec![epoch2], sst.epochs());
2282
2283        let (imm1_3, tracker1_3) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2284        uploader.add_imm(instance_id1, (imm1_3.clone(), tracker1_3));
2285        let epoch1_spill_payload3 = HashMap::from_iter([(instance_id1, vec![imm1_3.clone()])]);
2286        let (await_start1_3, finish_tx1_3) =
2287            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload3));
2288        uploader.may_flush_for_test();
2289        await_start1_3.await;
2290        let (imm1_4, tracker1_4) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2291        uploader.add_imm(instance_id1, (imm1_4.clone(), tracker1_4));
2292        let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]);
2293        let (await_start1_4, finish_tx1_4) =
2294            new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload));
2295        uploader.local_seal_epoch_for_test(instance_id1, epoch1);
2296        let (sync_tx1, sync_rx1) = oneshot::channel();
2297        uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID]));
2298        await_start1_4.await;
2299
2300        uploader.local_seal_epoch_for_test(instance_id1, epoch2);
2301        uploader.local_seal_epoch_for_test(instance_id2, epoch2);
2302
2303        // current uploader state:
2304        // unsealed: empty
2305        // sealed: epoch2: uploaded sst([imm2])
2306        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2307
2308        let (imm3_1, tracker3_1) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2309        let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]);
2310        uploader.add_imm(instance_id1, (imm3_1.clone(), tracker3_1));
2311        let (await_start3_1, finish_tx3_1) =
2312            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload1));
2313        uploader.may_flush_for_test();
2314        await_start3_1.await;
2315        let (imm3_2, tracker3_2) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2316        let epoch3_spill_payload2 = HashMap::from_iter([(instance_id2, vec![imm3_2.clone()])]);
2317        uploader.add_imm(instance_id2, (imm3_2.clone(), tracker3_2));
2318        let (await_start3_2, finish_tx3_2) =
2319            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload2));
2320        uploader.may_flush_for_test();
2321        await_start3_2.await;
2322        let (imm3_3, tracker3_3) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2323        uploader.add_imm(instance_id1, (imm3_3.clone(), tracker3_3));
2324
2325        // current uploader state:
2326        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2327        // sealed: uploaded sst([imm2])
2328        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2329
2330        uploader.local_seal_epoch_for_test(instance_id1, epoch3);
2331        let (imm4, tracker4) = gen_imm_with_limiter(epoch4, memory_limiter).await;
2332        uploader.add_imm(instance_id1, (imm4.clone(), tracker4));
2333        assert_uploader_pending(&mut uploader).await;
2334
2335        // current uploader state:
2336        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2337        //           epoch4: imm: imm4
2338        // sealed: uploaded sst([imm2])
2339        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2340
2341        finish_tx3_1.send(()).unwrap();
2342        assert_uploader_pending(&mut uploader).await;
2343        finish_tx1_4.send(()).unwrap();
2344        assert_uploader_pending(&mut uploader).await;
2345        finish_tx1_3.send(()).unwrap();
2346
2347        let sst = sst_collector.next(&mut uploader).await;
2348        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids());
2349
2350        let sst = sst_collector.next(&mut uploader).await;
2351        assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids());
2352
2353        match sync_rx1.await {
2354            Ok(Ok(data)) => {
2355                assert_eq!(3, data.uploaded_ssts.len());
2356                assert_eq!(
2357                    &get_payload_imm_ids(&epoch1_sync_payload),
2358                    data.uploaded_ssts[0].imm_ids()
2359                );
2360                assert_eq!(
2361                    &get_payload_imm_ids(&epoch1_spill_payload3),
2362                    data.uploaded_ssts[1].imm_ids()
2363                );
2364                assert_eq!(
2365                    &get_payload_imm_ids(&epoch1_spill_payload12),
2366                    data.uploaded_ssts[2].imm_ids()
2367                );
2368            }
2369            _ => {
2370                unreachable!()
2371            }
2372        }
2373
2374        // current uploader state:
2375        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2376        //           epoch4: imm: imm4
2377        // sealed: uploaded sst([imm2])
2378        // syncing: empty
2379        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2380
2381        let (sync_tx2, sync_rx2) = oneshot::channel();
2382        uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID]));
2383        uploader.local_seal_epoch_for_test(instance_id2, epoch3);
2384        let sst = sst_collector.next(&mut uploader).await;
2385        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids());
2386
2387        match sync_rx2.await {
2388            Ok(Ok(data)) => {
2389                assert_eq!(data.uploaded_ssts.len(), 1);
2390                assert_eq!(
2391                    &get_payload_imm_ids(&epoch2_spill_payload),
2392                    data.uploaded_ssts[0].imm_ids()
2393                );
2394            }
2395            _ => {
2396                unreachable!("should be sync finish");
2397            }
2398        }
2399        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2400
2401        // current uploader state:
2402        // unsealed: epoch4: imm: imm4
2403        // sealed: imm: imm3_3, uploading: [imm3_2], uploaded: sst([imm3_1])
2404        // syncing: empty
2405        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2406        //         epoch2: sst([imm2])
2407
2408        uploader.local_seal_epoch_for_test(instance_id1, epoch4);
2409        uploader.local_seal_epoch_for_test(instance_id2, epoch4);
2410        let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]);
2411        let (await_start4_with_3_3, finish_tx4_with_3_3) =
2412            new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload));
2413        let (sync_tx4, mut sync_rx4) = oneshot::channel();
2414        uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID]));
2415        await_start4_with_3_3.await;
2416
2417        // current uploader state:
2418        // unsealed: empty
2419        // sealed: empty
2420        // syncing: epoch4: uploading: [imm4, imm3_3], [imm3_2], uploaded: sst([imm3_1])
2421        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2422        //         epoch2: sst([imm2])
2423
2424        assert_uploader_pending(&mut uploader).await;
2425
2426        finish_tx3_2.send(()).unwrap();
2427        let sst = sst_collector.next(&mut uploader).await;
2428        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids());
2429
2430        finish_tx4_with_3_3.send(()).unwrap();
2431        assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await);
2432
2433        let sst = sst_collector.next(&mut uploader).await;
2434        assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids());
2435
2436        match sync_rx4.await {
2437            Ok(Ok(data)) => {
2438                assert_eq!(3, data.uploaded_ssts.len());
2439                assert_eq!(
2440                    &get_payload_imm_ids(&epoch4_sync_payload),
2441                    data.uploaded_ssts[0].imm_ids()
2442                );
2443                assert_eq!(
2444                    &get_payload_imm_ids(&epoch3_spill_payload2),
2445                    data.uploaded_ssts[1].imm_ids()
2446                );
2447                assert_eq!(
2448                    &get_payload_imm_ids(&epoch3_spill_payload1),
2449                    data.uploaded_ssts[2].imm_ids(),
2450                )
2451            }
2452            _ => {
2453                unreachable!("should be sync finish");
2454            }
2455        }
2456        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2457
2458        // current uploader state:
2459        // unsealed: empty
2460        // sealed: empty
2461        // syncing: empty
2462        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2463        //         epoch2: sst([imm2])
2464        //         epoch4: sst([imm4, imm3_3]), sst([imm3_2]), sst([imm3_1])
2465    }
2466
2467    #[tokio::test]
2468    async fn test_uploader_frequently_flush() {
2469        let config = StorageOpts {
2470            shared_buffer_capacity_mb: 10,
2471            shared_buffer_flush_ratio: 0.12,
2472            // This test will fail when we set it to 0
2473            shared_buffer_min_batch_flush_size_mb: 1,
2474            ..Default::default()
2475        };
2476        let (buffer_tracker, mut uploader, _new_task_notifier) =
2477            prepare_uploader_order_test(&config, true);
2478
2479        let epoch1 = INITIAL_EPOCH.next_epoch();
2480        let epoch2 = epoch1.next_epoch();
2481        uploader.start_epochs_for_test([epoch1, epoch2]);
2482        let instance_id1 = 1;
2483        let instance_id2 = 2;
2484        let flush_threshold = buffer_tracker.flush_threshold();
2485        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2486        let memory_limiter = memory_limiter.as_ref();
2487
2488        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2489        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2490
2491        // imm2 contains data in newer epoch, but added first
2492        let mut total_memory = 0;
2493        while total_memory < flush_threshold {
2494            let (imm, tracker) = gen_imm_with_limiter(epoch2, memory_limiter).await;
2495            total_memory += imm.size();
2496            if total_memory > flush_threshold {
2497                break;
2498            }
2499            uploader.add_imm(instance_id2, (imm, tracker));
2500        }
2501        let (imm, tracker) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2502        uploader.add_imm(instance_id1, (imm, tracker));
2503        assert!(uploader.may_flush_for_test());
2504
2505        for _ in 0..10 {
2506            let (imm, tracker) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2507            uploader.add_imm(instance_id1, (imm, tracker));
2508            assert!(!uploader.may_flush_for_test());
2509        }
2510    }
2511}