risingwave_storage/hummock/event_handler/uploader/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod spiller;
16mod task_manager;
17pub(crate) mod test_utils;
18
19use std::cmp::Ordering;
20use std::collections::btree_map::Entry;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque, hash_map};
22use std::fmt::{Debug, Display, Formatter};
23use std::future::{Future, poll_fn};
24use std::mem::{replace, swap, take};
25use std::sync::Arc;
26use std::task::{Context, Poll, ready};
27
28use futures::FutureExt;
29use itertools::Itertools;
30use more_asserts::assert_gt;
31use prometheus::{Histogram, HistogramTimer, IntGauge};
32use risingwave_common::bitmap::BitmapBuilder;
33use risingwave_common::catalog::TableId;
34use risingwave_common::metrics::UintGauge;
35use risingwave_common::must_match;
36use risingwave_hummock_sdk::table_watermark::{
37    TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
40use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId, LocalSstableInfo};
41use task_manager::{TaskManager, UploadingTaskStatus};
42use thiserror_ext::AsReport;
43use tokio::sync::oneshot;
44use tokio::task::JoinHandle;
45use tracing::{debug, error, info, warn};
46
47use crate::hummock::event_handler::LocalInstanceId;
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, send_sync_result};
49use crate::hummock::event_handler::uploader::spiller::Spiller;
50use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm;
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchId;
53use crate::hummock::store::version::StagingSstableInfo;
54use crate::hummock::utils::MemoryTracker;
55use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
56use crate::mem_table::ImmId;
57use crate::monitor::HummockStateStoreMetrics;
58use crate::store::SealCurrentEpochOptions;
59
60/// Take epoch data inclusively before `epoch` out from `data`
61fn take_before_epoch<T>(
62    data: &mut BTreeMap<HummockEpoch, T>,
63    epoch: HummockEpoch,
64) -> BTreeMap<HummockEpoch, T> {
65    let mut before_epoch_data = data.split_off(&(epoch + 1));
66    swap(&mut before_epoch_data, data);
67    before_epoch_data
68}
69
70type UploadTaskInput = HashMap<LocalInstanceId, Vec<UploaderImm>>;
71pub type UploadTaskPayload = HashMap<LocalInstanceId, Vec<ImmutableMemtable>>;
72
73#[derive(Debug)]
74pub struct UploadTaskOutput {
75    pub new_value_ssts: Vec<LocalSstableInfo>,
76    pub old_value_ssts: Vec<LocalSstableInfo>,
77    pub wait_poll_timer: Option<HistogramTimer>,
78}
79pub type SpawnUploadTask = Arc<
80    dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle<HummockResult<UploadTaskOutput>>
81        + Send
82        + Sync
83        + 'static,
84>;
85
86#[derive(Clone)]
87pub struct UploadTaskInfo {
88    pub task_size: usize,
89    pub epochs: Vec<HummockEpoch>,
90    pub table_ids: HashSet<TableId>,
91    pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
92}
93
94impl Display for UploadTaskInfo {
95    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96        f.debug_struct("UploadTaskInfo")
97            .field("task_size", &self.task_size)
98            .field("epochs", &self.epochs)
99            .field("len(imm_ids)", &self.imm_ids.len())
100            .finish()
101    }
102}
103
104impl Debug for UploadTaskInfo {
105    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("UploadTaskInfo")
107            .field("task_size", &self.task_size)
108            .field("epochs", &self.epochs)
109            .field("imm_ids", &self.imm_ids)
110            .finish()
111    }
112}
113
114mod uploader_imm {
115    use std::fmt::Formatter;
116    use std::ops::Deref;
117
118    use risingwave_common::metrics::{LabelGuardedIntGauge, UintGauge};
119
120    #[cfg(test)]
121    use crate::hummock::MemoryLimiter;
122    use crate::hummock::event_handler::uploader::UploaderContext;
123    use crate::hummock::utils::MemoryTracker;
124    use crate::mem_table::ImmutableMemtable;
125
126    pub(super) struct UploaderImm {
127        inner: ImmutableMemtable,
128        size_guard: UintGauge,
129        per_table_size_guard: LabelGuardedIntGauge,
130        per_table_count_guard: LabelGuardedIntGauge,
131        _tracker: MemoryTracker,
132    }
133
134    impl UploaderImm {
135        pub(super) fn new(
136            imm: ImmutableMemtable,
137            context: &UploaderContext,
138            tracker: MemoryTracker,
139        ) -> Self {
140            let size = imm.size();
141            let table_id_str = imm.table_id().to_string();
142            let size_guard = context.stats.uploader_imm_size.clone();
143            let per_table_size_guard = context
144                .stats
145                .uploader_per_table_imm_size
146                .with_guarded_label_values(&[&table_id_str]);
147            let per_table_count_guard = context
148                .stats
149                .uploader_per_table_imm_count
150                .with_guarded_label_values(&[&table_id_str]);
151            size_guard.add(size as _);
152            per_table_size_guard.add(size as _);
153            per_table_count_guard.add(1 as _);
154            Self {
155                inner: imm,
156                size_guard,
157                per_table_size_guard,
158                per_table_count_guard,
159                _tracker: tracker,
160            }
161        }
162
163        #[cfg(test)]
164        pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
165            Self {
166                inner: imm,
167                size_guard: UintGauge::new("test", "test").unwrap(),
168                per_table_size_guard: LabelGuardedIntGauge::test_int_gauge::<1>(),
169                per_table_count_guard: LabelGuardedIntGauge::test_int_gauge::<1>(),
170                _tracker: MemoryLimiter::unlimit().try_require_memory(1).unwrap(),
171            }
172        }
173    }
174
175    impl std::fmt::Debug for UploaderImm {
176        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
177            self.inner.fmt(f)
178        }
179    }
180
181    impl Deref for UploaderImm {
182        type Target = ImmutableMemtable;
183
184        fn deref(&self) -> &Self::Target {
185            &self.inner
186        }
187    }
188
189    impl Drop for UploaderImm {
190        fn drop(&mut self) {
191            self.size_guard.sub(self.inner.size() as _);
192            self.per_table_size_guard.sub(self.inner.size() as _);
193            self.per_table_count_guard.dec();
194        }
195    }
196}
197
198#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
199struct UploadingTaskId(usize);
200
201/// A wrapper for a uploading task that compacts and uploads the imm payload. Task context are
202/// stored so that when the task fails, it can be re-tried.
203struct UploadingTask {
204    task_id: UploadingTaskId,
205    // newer data at the front
206    input: UploadTaskInput,
207    join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
208    task_info: UploadTaskInfo,
209    spawn_upload_task: SpawnUploadTask,
210    task_size_guard: UintGauge,
211    task_count_guard: IntGauge,
212}
213
214impl Drop for UploadingTask {
215    fn drop(&mut self) {
216        self.task_size_guard.sub(self.task_info.task_size as u64);
217        self.task_count_guard.dec();
218    }
219}
220
221impl Debug for UploadingTask {
222    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
223        f.debug_struct("UploadingTask")
224            .field("input", &self.input)
225            .field("task_info", &self.task_info)
226            .finish()
227    }
228}
229
230fn get_payload_imm_ids(
231    payload: &UploadTaskPayload,
232) -> HashMap<LocalInstanceId, Vec<SharedBufferBatchId>> {
233    payload
234        .iter()
235        .map(|(instance_id, imms)| {
236            (
237                *instance_id,
238                imms.iter().map(|imm| imm.batch_id()).collect_vec(),
239            )
240        })
241        .collect()
242}
243
244impl UploadingTask {
245    // INFO logs will be enabled for task with size exceeding 50MB.
246    const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);
247
248    fn input_to_payload(input: &UploadTaskInput) -> UploadTaskPayload {
249        input
250            .iter()
251            .map(|(instance_id, imms)| {
252                (
253                    *instance_id,
254                    imms.iter().map(|imm| (**imm).clone()).collect(),
255                )
256            })
257            .collect()
258    }
259
260    fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
261        assert!(!input.is_empty());
262        let mut epochs = input
263            .iter()
264            .flat_map(|(_, imms)| imms.iter().map(|imm| imm.epoch()))
265            .sorted()
266            .dedup()
267            .collect_vec();
268
269        // reverse to make newer epochs comes first
270        epochs.reverse();
271        let payload = Self::input_to_payload(&input);
272        let imm_ids = get_payload_imm_ids(&payload);
273        let task_size = input
274            .values()
275            .map(|imms| imms.iter().map(|imm| imm.size()).sum::<usize>())
276            .sum();
277        let task_info = UploadTaskInfo {
278            task_size,
279            epochs,
280            table_ids: payload
281                .values()
282                .flat_map(|imms| imms.iter().map(|imm| imm.table_id()))
283                .collect(),
284            imm_ids,
285        };
286        context
287            .buffer_tracker
288            .global_upload_task_size()
289            .add(task_size as u64);
290        if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
291            info!("start upload task: {:?}", task_info);
292        } else {
293            debug!("start upload task: {:?}", task_info);
294        }
295        let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
296        context.stats.uploader_uploading_task_count.inc();
297        Self {
298            task_id,
299            input,
300            join_handle,
301            task_info,
302            spawn_upload_task: context.spawn_upload_task.clone(),
303            task_size_guard: context.buffer_tracker.global_upload_task_size().clone(),
304            task_count_guard: context.stats.uploader_uploading_task_count.clone(),
305        }
306    }
307
308    /// Poll the result of the uploading task
309    fn poll_result(
310        &mut self,
311        cx: &mut Context<'_>,
312    ) -> Poll<HummockResult<Arc<StagingSstableInfo>>> {
313        Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
314            Ok(task_result) => task_result
315                .inspect(|_| {
316                    if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
317                        info!(task_info = ?self.task_info, "upload task finish");
318                    } else {
319                        debug!(task_info = ?self.task_info, "upload task finish");
320                    }
321                })
322                .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
323                .map(|output| {
324                    Arc::new(StagingSstableInfo::new(
325                        output.new_value_ssts,
326                        output.old_value_ssts,
327                        self.task_info.epochs.clone(),
328                        self.task_info.imm_ids.clone(),
329                        self.task_info.task_size,
330                    ))
331                }),
332
333            Err(err) => Err(HummockError::other(format!(
334                "fail to join upload join handle: {}",
335                err.as_report()
336            ))),
337        })
338    }
339
340    /// Poll the uploading task until it succeeds. If it fails, we will retry it.
341    fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll<Arc<StagingSstableInfo>> {
342        loop {
343            let result = ready!(self.poll_result(cx));
344            match result {
345                Ok(sstables) => return Poll::Ready(sstables),
346                Err(e) => {
347                    error!(
348                        error = %e.as_report(),
349                        task_info = ?self.task_info,
350                        "a flush task failed, start retry",
351                    );
352                    self.join_handle = (self.spawn_upload_task)(
353                        Self::input_to_payload(&self.input),
354                        self.task_info.clone(),
355                    );
356                    // It is important not to return Poll::pending here immediately, because the new
357                    // join_handle is not polled yet, and will not awake the current task when
358                    // succeed. It will be polled in the next loop iteration.
359                }
360            }
361        }
362    }
363
364    pub fn get_task_info(&self) -> &UploadTaskInfo {
365        &self.task_info
366    }
367}
368
369impl TableUnsyncData {
370    fn add_table_watermarks(
371        &mut self,
372        epoch: HummockEpoch,
373        table_watermarks: Vec<VnodeWatermark>,
374        direction: WatermarkDirection,
375        watermark_type: WatermarkSerdeType,
376    ) {
377        if table_watermarks.is_empty() {
378            return;
379        }
380        let vnode_count = table_watermarks[0].vnode_count();
381        for watermark in &table_watermarks {
382            assert_eq!(vnode_count, watermark.vnode_count());
383        }
384
385        fn apply_new_vnodes(
386            vnode_bitmap: &mut BitmapBuilder,
387            vnode_watermarks: &Vec<VnodeWatermark>,
388        ) {
389            for vnode_watermark in vnode_watermarks {
390                for vnode in vnode_watermark.vnode_bitmap().iter_ones() {
391                    assert!(
392                        !vnode_bitmap.is_set(vnode),
393                        "vnode {} write multiple table watermarks",
394                        vnode
395                    );
396                    vnode_bitmap.set(vnode, true);
397                }
398            }
399        }
400        match &mut self.table_watermarks {
401            Some((prev_direction, prev_watermarks, prev_watermark_type)) => {
402                assert_eq!(
403                    *prev_direction, direction,
404                    "table id {} new watermark direction not match with previous",
405                    self.table_id
406                );
407                assert_eq!(
408                    *prev_watermark_type, watermark_type,
409                    "table id {} new watermark watermark_type not match with previous",
410                    self.table_id
411                );
412                match prev_watermarks.entry(epoch) {
413                    Entry::Occupied(mut entry) => {
414                        let (prev_watermarks, vnode_bitmap) = entry.get_mut();
415                        apply_new_vnodes(vnode_bitmap, &table_watermarks);
416                        prev_watermarks.extend(table_watermarks);
417                    }
418                    Entry::Vacant(entry) => {
419                        let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
420                        apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
421                        entry.insert((table_watermarks, vnode_bitmap));
422                    }
423                }
424            }
425            None => {
426                let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
427                apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
428                self.table_watermarks = Some((
429                    direction,
430                    BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]),
431                    watermark_type,
432                ));
433            }
434        }
435    }
436}
437
438impl UploaderData {
439    fn add_table_watermarks(
440        all_table_watermarks: &mut HashMap<TableId, TableWatermarks>,
441        table_id: TableId,
442        direction: WatermarkDirection,
443        watermarks: impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)>,
444        watermark_type: WatermarkSerdeType,
445    ) {
446        let mut table_watermarks: Option<TableWatermarks> = None;
447        for (epoch, watermarks) in watermarks {
448            match &mut table_watermarks {
449                Some(prev_watermarks) => {
450                    assert_eq!(prev_watermarks.direction, direction);
451                    assert_eq!(prev_watermarks.watermark_type, watermark_type);
452                    prev_watermarks.add_new_epoch_watermarks(
453                        epoch,
454                        Arc::from(watermarks),
455                        direction,
456                        watermark_type,
457                    );
458                }
459                None => {
460                    table_watermarks = Some(TableWatermarks::single_epoch(
461                        epoch,
462                        watermarks,
463                        direction,
464                        watermark_type,
465                    ));
466                }
467            }
468        }
469        if let Some(table_watermarks) = table_watermarks {
470            assert!(
471                all_table_watermarks
472                    .insert(table_id, table_watermarks)
473                    .is_none()
474            );
475        }
476    }
477}
478
479struct LocalInstanceEpochData {
480    epoch: HummockEpoch,
481    // newer data comes first.
482    imms: VecDeque<UploaderImm>,
483    has_spilled: bool,
484}
485
486impl LocalInstanceEpochData {
487    fn new(epoch: HummockEpoch) -> Self {
488        Self {
489            epoch,
490            imms: VecDeque::new(),
491            has_spilled: false,
492        }
493    }
494
495    fn epoch(&self) -> HummockEpoch {
496        self.epoch
497    }
498
499    fn add_imm(&mut self, imm: UploaderImm) {
500        assert_eq!(self.epoch, imm.epoch());
501        if let Some(prev_imm) = self.imms.front() {
502            assert_gt!(imm.batch_id(), prev_imm.batch_id());
503        }
504        self.imms.push_front(imm);
505    }
506
507    fn is_empty(&self) -> bool {
508        self.imms.is_empty()
509    }
510}
511
512struct LocalInstanceUnsyncData {
513    table_id: TableId,
514    instance_id: LocalInstanceId,
515    // None means that the current instance should have stopped advancing
516    current_epoch_data: Option<LocalInstanceEpochData>,
517    // newer data comes first.
518    sealed_data: VecDeque<LocalInstanceEpochData>,
519    // newer data comes first
520    flushing_imms: VecDeque<SharedBufferBatchId>,
521    is_destroyed: bool,
522    is_stopped: bool,
523}
524
525impl LocalInstanceUnsyncData {
526    fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self {
527        Self {
528            table_id,
529            instance_id,
530            current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
531            sealed_data: VecDeque::new(),
532            flushing_imms: Default::default(),
533            is_destroyed: false,
534            is_stopped: false,
535        }
536    }
537
538    fn add_imm(&mut self, imm: UploaderImm) {
539        assert!(!self.is_destroyed);
540        assert!(!self.is_stopped);
541        assert_eq!(self.table_id, imm.table_id);
542        self.current_epoch_data
543            .as_mut()
544            .expect("should be Some when adding new imm")
545            .add_imm(imm);
546    }
547
548    fn local_seal_epoch(&mut self, next_epoch: HummockEpoch, stopped: bool) -> HummockEpoch {
549        let data = self
550            .current_epoch_data
551            .as_mut()
552            .expect("should be Some when seal new epoch");
553        let current_epoch = data.epoch;
554        debug!(
555            instance_id = self.instance_id,
556            next_epoch, current_epoch, stopped, "local seal epoch"
557        );
558        assert_gt!(next_epoch, current_epoch);
559        assert!(!self.is_destroyed);
560        assert!(!self.is_stopped);
561        self.is_stopped = stopped;
562        let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch));
563        if !epoch_data.is_empty() {
564            self.sealed_data.push_front(epoch_data);
565        }
566        current_epoch
567    }
568
569    // imm_ids from old to new, which means in ascending order
570    fn ack_flushed(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
571        for imm_id in imm_ids {
572            assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id);
573        }
574    }
575
576    fn spill(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
577        let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() {
578            match oldest_sealed_epoch.epoch.cmp(&epoch) {
579                Ordering::Less => {
580                    unreachable!(
581                        "should not spill at this epoch because there \
582                    is unspilled data in previous epoch: prev epoch {}, spill epoch {}",
583                        oldest_sealed_epoch.epoch, epoch
584                    );
585                }
586                Ordering::Equal => {
587                    let epoch_data = self.sealed_data.pop_back().unwrap();
588                    assert_eq!(epoch, epoch_data.epoch);
589                    epoch_data.imms
590                }
591                Ordering::Greater => VecDeque::new(),
592            }
593        } else {
594            let Some(current_epoch_data) = &mut self.current_epoch_data else {
595                return Vec::new();
596            };
597            match current_epoch_data.epoch.cmp(&epoch) {
598                Ordering::Less => {
599                    assert!(
600                        current_epoch_data.imms.is_empty(),
601                        "should not spill at this epoch because there \
602                    is unspilled data in current epoch epoch {}, spill epoch {}",
603                        current_epoch_data.epoch,
604                        epoch
605                    );
606                    VecDeque::new()
607                }
608                Ordering::Equal => {
609                    if !current_epoch_data.imms.is_empty() {
610                        current_epoch_data.has_spilled = true;
611                        take(&mut current_epoch_data.imms)
612                    } else {
613                        VecDeque::new()
614                    }
615                }
616                Ordering::Greater => VecDeque::new(),
617            }
618        };
619        self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
620        imms.into_iter().collect()
621    }
622
623    fn add_flushing_imm(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
624        for imm_id in imm_ids {
625            if let Some(prev_imm_id) = self.flushing_imms.front() {
626                assert_gt!(imm_id, *prev_imm_id);
627            }
628            self.flushing_imms.push_front(imm_id);
629        }
630    }
631
632    // start syncing the imm inclusively before the `epoch`
633    // returning data with newer data coming first
634    fn sync(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
635        // firstly added from old to new
636        let mut ret = Vec::new();
637        while let Some(epoch_data) = self.sealed_data.back()
638            && epoch_data.epoch() <= epoch
639        {
640            let imms = self.sealed_data.pop_back().expect("checked exist").imms;
641            self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
642            ret.extend(imms.into_iter().rev());
643        }
644        // reverse so that newer data comes first
645        ret.reverse();
646        if let Some(latest_epoch_data) = &self.current_epoch_data
647            && latest_epoch_data.epoch <= epoch
648        {
649            assert!(self.sealed_data.is_empty());
650            assert!(latest_epoch_data.is_empty());
651            assert!(!latest_epoch_data.has_spilled);
652            if cfg!(debug_assertions) {
653                panic!(
654                    "sync epoch exceeds latest epoch, and the current instance should have been archived, table_id = {}, latest_epoch_data = {}, epoch = {}",
655                    self.table_id,
656                    latest_epoch_data.epoch(),
657                    epoch
658                );
659            }
660            warn!(
661                instance_id = self.instance_id,
662                table_id = %self.table_id,
663                "sync epoch exceeds latest epoch, and the current instance should have be archived"
664            );
665            self.current_epoch_data = None;
666        }
667        ret
668    }
669
670    fn assert_after_epoch(&self, epoch: HummockEpoch) {
671        if let Some(oldest_sealed_data) = self.sealed_data.back() {
672            assert!(!oldest_sealed_data.imms.is_empty());
673            assert_gt!(oldest_sealed_data.epoch, epoch);
674        } else if let Some(current_data) = &self.current_epoch_data
675            && current_data.epoch <= epoch
676        {
677            assert!(current_data.imms.is_empty() && !current_data.has_spilled);
678        }
679    }
680
681    fn is_finished(&self) -> bool {
682        self.is_destroyed && self.sealed_data.is_empty()
683    }
684}
685
686struct TableUnsyncData {
687    table_id: TableId,
688    instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
689    #[expect(clippy::type_complexity)]
690    table_watermarks: Option<(
691        WatermarkDirection,
692        BTreeMap<HummockEpoch, (Vec<VnodeWatermark>, BitmapBuilder)>,
693        WatermarkSerdeType,
694    )>,
695    spill_tasks: BTreeMap<HummockEpoch, VecDeque<UploadingTaskId>>,
696    unsync_epochs: BTreeMap<HummockEpoch, ()>,
697    // newer epoch at the front
698    syncing_epochs: VecDeque<HummockEpoch>,
699    max_synced_epoch: Option<HummockEpoch>,
700}
701
702impl TableUnsyncData {
703    fn new(table_id: TableId, committed_epoch: Option<HummockEpoch>) -> Self {
704        Self {
705            table_id,
706            instance_data: Default::default(),
707            table_watermarks: None,
708            spill_tasks: Default::default(),
709            unsync_epochs: Default::default(),
710            syncing_epochs: Default::default(),
711            max_synced_epoch: committed_epoch,
712        }
713    }
714
715    fn new_epoch(&mut self, epoch: HummockEpoch) {
716        debug!(table_id = ?self.table_id, epoch, "table new epoch");
717        if let Some(latest_epoch) = self.max_epoch() {
718            assert_gt!(epoch, latest_epoch);
719        }
720        self.unsync_epochs.insert(epoch, ());
721    }
722
723    #[expect(clippy::type_complexity)]
724    fn sync(
725        &mut self,
726        epoch: HummockEpoch,
727    ) -> (
728        impl Iterator<Item = (LocalInstanceId, Vec<UploaderImm>)> + '_,
729        Option<(
730            WatermarkDirection,
731            impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)> + use<>,
732            WatermarkSerdeType,
733        )>,
734        impl Iterator<Item = UploadingTaskId> + use<>,
735        BTreeMap<HummockEpoch, ()>,
736    ) {
737        if let Some(prev_epoch) = self.max_sync_epoch() {
738            assert_gt!(epoch, prev_epoch)
739        }
740        let epochs = take_before_epoch(&mut self.unsync_epochs, epoch);
741        assert_eq!(
742            *epochs.last_key_value().expect("non-empty").0,
743            epoch,
744            "{epochs:?} {epoch} {:?}",
745            self.table_id
746        );
747        self.syncing_epochs.push_front(epoch);
748        (
749            self.instance_data
750                .iter_mut()
751                .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))),
752            self.table_watermarks
753                .as_mut()
754                .map(|(direction, watermarks, watermark_type)| {
755                    let watermarks = take_before_epoch(watermarks, epoch)
756                        .into_iter()
757                        .map(|(epoch, (watermarks, _))| (epoch, watermarks));
758                    (*direction, watermarks, *watermark_type)
759                }),
760            take_before_epoch(&mut self.spill_tasks, epoch)
761                .into_values()
762                .flat_map(|tasks| tasks.into_iter()),
763            epochs,
764        )
765    }
766
767    fn ack_synced(&mut self, sync_epoch: HummockEpoch) {
768        let min_sync_epoch = self.syncing_epochs.pop_back().expect("should exist");
769        assert_eq!(sync_epoch, min_sync_epoch);
770        self.max_synced_epoch = Some(sync_epoch);
771    }
772
773    fn ack_committed(&mut self, committed_epoch: HummockEpoch) {
774        let synced_epoch_advanced = {
775            if let Some(max_synced_epoch) = self.max_synced_epoch
776                && max_synced_epoch >= committed_epoch
777            {
778                false
779            } else {
780                true
781            }
782        };
783        if synced_epoch_advanced {
784            self.max_synced_epoch = Some(committed_epoch);
785            if let Some(min_syncing_epoch) = self.syncing_epochs.back() {
786                assert_gt!(*min_syncing_epoch, committed_epoch);
787            }
788            self.assert_after_epoch(committed_epoch);
789        }
790    }
791
792    fn assert_after_epoch(&self, epoch: HummockEpoch) {
793        self.instance_data
794            .values()
795            .for_each(|instance_data| instance_data.assert_after_epoch(epoch));
796        if let Some((_, watermarks, _)) = &self.table_watermarks
797            && let Some((oldest_epoch, _)) = watermarks.first_key_value()
798        {
799            assert_gt!(*oldest_epoch, epoch);
800        }
801    }
802
803    fn max_sync_epoch(&self) -> Option<HummockEpoch> {
804        self.syncing_epochs
805            .front()
806            .cloned()
807            .or(self.max_synced_epoch)
808    }
809
810    fn max_epoch(&self) -> Option<HummockEpoch> {
811        self.unsync_epochs
812            .last_key_value()
813            .map(|(epoch, _)| *epoch)
814            .or_else(|| self.max_sync_epoch())
815    }
816
817    fn is_empty(&self) -> bool {
818        self.instance_data.is_empty()
819            && self.syncing_epochs.is_empty()
820            && self.unsync_epochs.is_empty()
821    }
822}
823
824#[derive(Eq, Hash, PartialEq, Copy, Clone)]
825struct UnsyncEpochId(HummockEpoch, TableId);
826
827impl UnsyncEpochId {
828    fn epoch(&self) -> HummockEpoch {
829        self.0
830    }
831}
832
833fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet<TableId>) -> Option<UnsyncEpochId> {
834    table_ids
835        .iter()
836        .min()
837        .map(|table_id| UnsyncEpochId(epoch, *table_id))
838}
839
840#[derive(Default)]
841/// Unsync data, can be either imm or spilled sst, and some aggregated epoch information.
842///
843/// `instance_data` holds the imm of each individual local instance, and data are first added here.
844/// The aggregated epoch information (table watermarks, etc.) and the spilled sst will be added to `epoch_data`.
845struct UnsyncData {
846    table_data: HashMap<TableId, TableUnsyncData>,
847    // An index as a mapping from instance id to its table id
848    instance_table_id: HashMap<LocalInstanceId, TableId>,
849    unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
850    spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
851}
852
853impl UnsyncData {
854    fn init_instance(
855        &mut self,
856        table_id: TableId,
857        instance_id: LocalInstanceId,
858        init_epoch: HummockEpoch,
859    ) {
860        debug!(
861            table_id = %table_id,
862            instance_id, init_epoch, "init epoch"
863        );
864        let table_data = self
865            .table_data
866            .get_mut(&table_id)
867            .unwrap_or_else(|| panic!("should exist. {table_id:?}"));
868        assert!(
869            table_data
870                .instance_data
871                .insert(
872                    instance_id,
873                    LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch)
874                )
875                .is_none()
876        );
877        assert!(
878            self.instance_table_id
879                .insert(instance_id, table_id)
880                .is_none()
881        );
882        assert!(table_data.unsync_epochs.contains_key(&init_epoch));
883    }
884
885    fn instance_data(
886        &mut self,
887        instance_id: LocalInstanceId,
888    ) -> Option<&mut LocalInstanceUnsyncData> {
889        self.instance_table_id
890            .get_mut(&instance_id)
891            .cloned()
892            .map(move |table_id| {
893                self.table_data
894                    .get_mut(&table_id)
895                    .expect("should exist")
896                    .instance_data
897                    .get_mut(&instance_id)
898                    .expect("should exist")
899            })
900    }
901
902    fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) {
903        self.instance_data(instance_id)
904            .expect("should exist")
905            .add_imm(imm);
906    }
907
908    fn local_seal_epoch(
909        &mut self,
910        instance_id: LocalInstanceId,
911        next_epoch: HummockEpoch,
912        opts: SealCurrentEpochOptions,
913    ) {
914        let table_id = self.instance_table_id[&instance_id];
915        let table_data = self.table_data.get_mut(&table_id).expect("should exist");
916        let instance_data = table_data
917            .instance_data
918            .get_mut(&instance_id)
919            .expect("should exist");
920        // When drop/cancel a streaming job, for the barrier to stop actor, the
921        // local instance will call `local_seal_epoch`, but the `next_epoch` won't be
922        // called `start_epoch` because we have stopped writing on it.
923        let stopped = !table_data.unsync_epochs.contains_key(&next_epoch);
924        let epoch = instance_data.local_seal_epoch(next_epoch, stopped);
925        if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks {
926            table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type);
927        }
928    }
929
930    fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
931        if let Some(table_id) = self.instance_table_id.get(&instance_id) {
932            debug!(instance_id, "destroy instance");
933            let table_data = self.table_data.get_mut(table_id).expect("should exist");
934            let instance_data = table_data
935                .instance_data
936                .get_mut(&instance_id)
937                .expect("should exist");
938            assert!(
939                !instance_data.is_destroyed,
940                "cannot destroy an instance for twice"
941            );
942            instance_data.is_destroyed = true;
943        }
944    }
945
946    fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
947        for table_id in table_ids {
948            if let Some(table_unsync_data) = self.table_data.remove(table_id) {
949                for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
950                    if let Some(task_status) = task_manager.abort_task(task_id) {
951                        must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
952                            assert!(spill_table_ids.is_subset(table_ids));
953                        });
954                    }
955                    if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
956                        assert!(spill_table_ids.is_subset(table_ids));
957                    }
958                }
959                assert!(
960                    table_unsync_data
961                        .instance_data
962                        .values()
963                        .all(|instance| instance.is_destroyed),
964                    "should be clear when dropping the read version instance"
965                );
966                for instance_id in table_unsync_data.instance_data.keys() {
967                    assert_eq!(
968                        *table_id,
969                        self.instance_table_id
970                            .remove(instance_id)
971                            .expect("should exist")
972                    );
973                }
974            }
975        }
976        debug_assert!(
977            self.spilled_data
978                .values()
979                .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))
980        );
981        self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
982            if !unsync_epoch_table_ids.is_disjoint(table_ids) {
983                assert!(unsync_epoch_table_ids.is_subset(table_ids));
984                false
985            } else {
986                true
987            }
988        });
989        assert!(
990            self.instance_table_id
991                .values()
992                .all(|table_id| !table_ids.contains(table_id))
993        );
994    }
995}
996
997impl UploaderData {
998    fn sync(
999        &mut self,
1000        context: &UploaderContext,
1001        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1002        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1003    ) {
1004        let mut all_table_watermarks = HashMap::new();
1005        let mut uploading_tasks = HashSet::new();
1006        let mut spilled_tasks = BTreeSet::new();
1007        let mut all_table_ids = HashSet::new();
1008        let mut vector_index_adds = HashMap::new();
1009
1010        let mut flush_payload = HashMap::new();
1011
1012        for (epoch, table_ids) in &sync_table_epochs {
1013            let epoch = *epoch;
1014            for table_id in table_ids {
1015                assert!(
1016                    all_table_ids.insert(*table_id),
1017                    "duplicate sync table epoch: {:?} {:?}",
1018                    all_table_ids,
1019                    sync_table_epochs
1020                );
1021            }
1022            if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) {
1023                let min_table_id_data = self
1024                    .unsync_data
1025                    .table_data
1026                    .get_mut(&min_table_id)
1027                    .expect("should exist");
1028                let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch);
1029                for epoch in epochs.keys() {
1030                    assert_eq!(
1031                        &self
1032                            .unsync_data
1033                            .unsync_epochs
1034                            .remove(&UnsyncEpochId(*epoch, min_table_id))
1035                            .expect("should exist"),
1036                        table_ids
1037                    );
1038                }
1039                for table_id in table_ids {
1040                    let table_data = self
1041                        .unsync_data
1042                        .table_data
1043                        .get_mut(table_id)
1044                        .expect("should exist");
1045                    let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) =
1046                        table_data.sync(epoch);
1047                    assert_eq!(table_unsync_epochs, epochs, "invalid table_id {table_id}");
1048                    for (instance_id, payload) in unflushed_payload {
1049                        if !payload.is_empty() {
1050                            flush_payload.insert(instance_id, payload);
1051                        }
1052                    }
1053                    table_data.instance_data.retain(|instance_id, data| {
1054                        // remove the finished instances
1055                        if data.is_finished() {
1056                            assert_eq!(
1057                                self.unsync_data.instance_table_id.remove(instance_id),
1058                                Some(*table_id)
1059                            );
1060                            false
1061                        } else {
1062                            true
1063                        }
1064                    });
1065                    if let Some((direction, watermarks, watermark_type)) = table_watermarks {
1066                        Self::add_table_watermarks(
1067                            &mut all_table_watermarks,
1068                            *table_id,
1069                            direction,
1070                            watermarks,
1071                            watermark_type,
1072                        );
1073                    }
1074                    for task_id in task_ids {
1075                        if self.unsync_data.spilled_data.contains_key(&task_id) {
1076                            spilled_tasks.insert(task_id);
1077                        } else {
1078                            uploading_tasks.insert(task_id);
1079                        }
1080                    }
1081
1082                    if let hash_map::Entry::Occupied(mut entry) =
1083                        self.unsync_vector_index_data.entry(*table_id)
1084                    {
1085                        let data = entry.get_mut();
1086                        let adds = take_before_epoch(&mut data.sealed_epoch_data, epoch)
1087                            .into_values()
1088                            .flatten()
1089                            .collect_vec();
1090                        if data.is_dropped && data.sealed_epoch_data.is_empty() {
1091                            entry.remove();
1092                        }
1093                        if !adds.is_empty() {
1094                            vector_index_adds
1095                                .try_insert(*table_id, adds)
1096                                .expect("non-duplicate");
1097                        }
1098                    }
1099                }
1100            }
1101        }
1102
1103        let sync_id = {
1104            let sync_id = self.next_sync_id;
1105            self.next_sync_id += 1;
1106            SyncId(sync_id)
1107        };
1108
1109        if let Some(extra_flush_task_id) = self.task_manager.sync(
1110            context,
1111            sync_id,
1112            flush_payload,
1113            uploading_tasks.iter().cloned(),
1114            &all_table_ids,
1115        ) {
1116            uploading_tasks.insert(extra_flush_task_id);
1117        }
1118
1119        // iter from large task_id to small one so that newer data at the front
1120        let uploaded = spilled_tasks
1121            .iter()
1122            .rev()
1123            .map(|task_id| {
1124                let (sst, spill_table_ids) = self
1125                    .unsync_data
1126                    .spilled_data
1127                    .remove(task_id)
1128                    .expect("should exist");
1129                assert!(
1130                    spill_table_ids.is_subset(&all_table_ids),
1131                    "spilled tabled ids {:?} not a subset of sync table id {:?}",
1132                    spill_table_ids,
1133                    all_table_ids
1134                );
1135                sst
1136            })
1137            .collect();
1138
1139        self.syncing_data.insert(
1140            sync_id,
1141            SyncingData {
1142                sync_table_epochs,
1143                remaining_uploading_tasks: uploading_tasks,
1144                uploaded,
1145                table_watermarks: all_table_watermarks,
1146                vector_index_adds,
1147                sync_result_sender,
1148            },
1149        );
1150
1151        self.check_upload_task_consistency();
1152    }
1153}
1154
1155impl UnsyncData {
1156    fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
1157        for (instance_id, imm_ids) in sstable_info.imm_ids() {
1158            if let Some(instance_data) = self.instance_data(*instance_id) {
1159                // take `rev` to let old imm id goes first
1160                instance_data.ack_flushed(imm_ids.iter().rev().cloned());
1161            }
1162        }
1163    }
1164}
1165
1166struct SyncingData {
1167    sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1168    remaining_uploading_tasks: HashSet<UploadingTaskId>,
1169    // newer data at the front
1170    uploaded: VecDeque<Arc<StagingSstableInfo>>,
1171    table_watermarks: HashMap<TableId, TableWatermarks>,
1172    vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1173    sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1174}
1175
1176#[derive(Debug)]
1177pub struct SyncedData {
1178    pub uploaded_ssts: VecDeque<Arc<StagingSstableInfo>>,
1179    pub table_watermarks: HashMap<TableId, TableWatermarks>,
1180    pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1181}
1182
1183struct UploaderContext {
1184    pinned_version: PinnedVersion,
1185    /// When called, it will spawn a task to flush the imm into sst and return the join handle.
1186    spawn_upload_task: SpawnUploadTask,
1187    buffer_tracker: BufferTracker,
1188
1189    stats: Arc<HummockStateStoreMetrics>,
1190}
1191
1192impl UploaderContext {
1193    fn new(
1194        pinned_version: PinnedVersion,
1195        spawn_upload_task: SpawnUploadTask,
1196        buffer_tracker: BufferTracker,
1197        stats: Arc<HummockStateStoreMetrics>,
1198    ) -> Self {
1199        UploaderContext {
1200            pinned_version,
1201            spawn_upload_task,
1202            buffer_tracker,
1203            stats,
1204        }
1205    }
1206}
1207
1208#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
1209struct SyncId(usize);
1210
1211struct UnsyncVectorIndexData {
1212    sealed_epoch_data: BTreeMap<HummockEpoch, Option<VectorIndexAdd>>,
1213    curr_epoch: u64,
1214    is_dropped: bool,
1215}
1216
1217#[derive(Default)]
1218struct UploaderData {
1219    unsync_data: UnsyncData,
1220    unsync_vector_index_data: HashMap<TableId, UnsyncVectorIndexData>,
1221
1222    syncing_data: BTreeMap<SyncId, SyncingData>,
1223
1224    task_manager: TaskManager,
1225    next_sync_id: usize,
1226}
1227
1228impl UploaderData {
1229    fn abort(self, err: impl Fn() -> HummockError) {
1230        self.task_manager.abort_all_tasks();
1231        for syncing_data in self.syncing_data.into_values() {
1232            send_sync_result(syncing_data.sync_result_sender, Err(err()));
1233        }
1234    }
1235
1236    fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
1237        if table_ids.is_empty() {
1238            return;
1239        }
1240        self.unsync_data
1241            .clear_tables(&table_ids, &mut self.task_manager);
1242        self.syncing_data.retain(|sync_id, syncing_data| {
1243            if syncing_data
1244                .sync_table_epochs
1245                .iter()
1246                .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids))
1247            {
1248                assert!(
1249                    syncing_data
1250                        .sync_table_epochs
1251                        .iter()
1252                        .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))
1253                );
1254                for task_id in &syncing_data.remaining_uploading_tasks {
1255                    match self
1256                        .task_manager
1257                        .abort_task(*task_id)
1258                        .expect("should exist")
1259                    {
1260                        UploadingTaskStatus::Spilling(spill_table_ids) => {
1261                            assert!(spill_table_ids.is_subset(&table_ids));
1262                        }
1263                        UploadingTaskStatus::Sync(task_sync_id) => {
1264                            assert_eq!(sync_id, &task_sync_id);
1265                        }
1266                    }
1267                }
1268                false
1269            } else {
1270                true
1271            }
1272        });
1273
1274        self.check_upload_task_consistency();
1275    }
1276
1277    fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1278        self.unsync_data
1279            .spilled_data
1280            .values()
1281            .map(|(s, _)| s)
1282            .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
1283            .filter_map(|s| {
1284                s.sstable_infos()
1285                    .iter()
1286                    .chain(s.old_value_sstable_infos())
1287                    .map(|s| s.sst_info.object_id)
1288                    .min()
1289            })
1290            .min()
1291            .map(|object_id| object_id.as_raw())
1292    }
1293}
1294
1295struct ErrState {
1296    failed_sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1297    reason: String,
1298}
1299
1300enum UploaderState {
1301    Working(UploaderData),
1302    Err(ErrState),
1303}
1304
1305/// An uploader for hummock data.
1306///
1307/// Data have 3 sequential stages: unsync (inside each local instance, data can be unsealed, sealed), syncing, synced.
1308///
1309/// The 3 stages are divided by 2 marginal epochs: `max_syncing_epoch`,
1310/// `max_synced_epoch` in each `TableUnSyncData`. Epochs satisfy the following inequality.
1311///
1312/// (epochs of `synced_data`) <= `max_synced_epoch` < (epochs of `syncing_data`) <=
1313/// `max_syncing_epoch` < (epochs of `unsync_data`)
1314///
1315/// Data are mostly stored in `VecDeque`, and the order stored in the `VecDeque` indicates the data
1316/// order. Data at the front represents ***newer*** data.
1317pub struct HummockUploader {
1318    state: UploaderState,
1319
1320    context: UploaderContext,
1321}
1322
1323impl HummockUploader {
1324    pub(super) fn new(
1325        state_store_metrics: Arc<HummockStateStoreMetrics>,
1326        pinned_version: PinnedVersion,
1327        spawn_upload_task: SpawnUploadTask,
1328        buffer_tracker: BufferTracker,
1329    ) -> Self {
1330        Self {
1331            state: UploaderState::Working(UploaderData::default()),
1332            context: UploaderContext::new(
1333                pinned_version,
1334                spawn_upload_task,
1335                buffer_tracker,
1336                state_store_metrics,
1337            ),
1338        }
1339    }
1340
1341    pub(super) fn buffer_tracker(&self) -> &BufferTracker {
1342        &self.context.buffer_tracker
1343    }
1344
1345    pub(super) fn hummock_version(&self) -> &PinnedVersion {
1346        &self.context.pinned_version
1347    }
1348
1349    pub(super) fn add_imms(
1350        &mut self,
1351        instance_id: LocalInstanceId,
1352        imms: Vec<(ImmutableMemtable, MemoryTracker)>,
1353    ) {
1354        let UploaderState::Working(data) = &mut self.state else {
1355            return;
1356        };
1357        for (imm, tracker) in imms {
1358            let imm = UploaderImm::new(imm, &self.context, tracker);
1359            data.unsync_data.add_imm(instance_id, imm);
1360        }
1361    }
1362
1363    pub(super) fn init_instance(
1364        &mut self,
1365        instance_id: LocalInstanceId,
1366        table_id: TableId,
1367        init_epoch: HummockEpoch,
1368    ) {
1369        let UploaderState::Working(data) = &mut self.state else {
1370            return;
1371        };
1372        data.unsync_data
1373            .init_instance(table_id, instance_id, init_epoch);
1374    }
1375
1376    pub(super) fn local_seal_epoch(
1377        &mut self,
1378        instance_id: LocalInstanceId,
1379        next_epoch: HummockEpoch,
1380        opts: SealCurrentEpochOptions,
1381    ) {
1382        let UploaderState::Working(data) = &mut self.state else {
1383            return;
1384        };
1385        data.unsync_data
1386            .local_seal_epoch(instance_id, next_epoch, opts);
1387    }
1388
1389    pub(super) fn register_vector_writer(&mut self, table_id: TableId, init_epoch: HummockEpoch) {
1390        let UploaderState::Working(data) = &mut self.state else {
1391            return;
1392        };
1393        assert!(
1394            data.unsync_vector_index_data
1395                .try_insert(
1396                    table_id,
1397                    UnsyncVectorIndexData {
1398                        sealed_epoch_data: Default::default(),
1399                        curr_epoch: init_epoch,
1400                        is_dropped: false,
1401                    }
1402                )
1403                .is_ok(),
1404            "duplicate vector writer on {}",
1405            table_id
1406        )
1407    }
1408
1409    pub(super) fn vector_writer_seal_epoch(
1410        &mut self,
1411        table_id: TableId,
1412        next_epoch: HummockEpoch,
1413        add: Option<VectorIndexAdd>,
1414    ) {
1415        let UploaderState::Working(data) = &mut self.state else {
1416            return;
1417        };
1418        let data = data
1419            .unsync_vector_index_data
1420            .get_mut(&table_id)
1421            .expect("should exist");
1422        assert!(!data.is_dropped);
1423        assert!(
1424            data.curr_epoch < next_epoch,
1425            "next epoch {} should be greater than current epoch {}",
1426            next_epoch,
1427            data.curr_epoch
1428        );
1429        data.sealed_epoch_data
1430            .try_insert(data.curr_epoch, add)
1431            .expect("non-duplicate");
1432        data.curr_epoch = next_epoch;
1433    }
1434
1435    pub(super) fn drop_vector_writer(&mut self, table_id: TableId) {
1436        let UploaderState::Working(data) = &mut self.state else {
1437            return;
1438        };
1439        let hash_map::Entry::Occupied(mut entry) = data.unsync_vector_index_data.entry(table_id)
1440        else {
1441            panic!("vector writer {} should exist", table_id);
1442        };
1443        let data = entry.get_mut();
1444        data.is_dropped = true;
1445        if data.sealed_epoch_data.is_empty() {
1446            entry.remove();
1447        }
1448    }
1449
1450    pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
1451        let UploaderState::Working(data) = &mut self.state else {
1452            return;
1453        };
1454        debug!(epoch, ?table_ids, "start epoch");
1455        for table_id in &table_ids {
1456            let table_data = data
1457                .unsync_data
1458                .table_data
1459                .entry(*table_id)
1460                .or_insert_with(|| {
1461                    TableUnsyncData::new(
1462                        *table_id,
1463                        self.context.pinned_version.table_committed_epoch(*table_id),
1464                    )
1465                });
1466            table_data.new_epoch(epoch);
1467        }
1468        if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) {
1469            assert!(
1470                data.unsync_data
1471                    .unsync_epochs
1472                    .insert(unsync_epoch_id, table_ids)
1473                    .is_none()
1474            );
1475        }
1476    }
1477
1478    pub(super) fn start_sync_epoch(
1479        &mut self,
1480        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1481        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1482    ) {
1483        let data = match &mut self.state {
1484            UploaderState::Working(data) => data,
1485            UploaderState::Err(ErrState {
1486                failed_sync_table_epochs,
1487                reason,
1488            }) => {
1489                let result = Err(HummockError::other(format!(
1490                    "previous sync epoch {:?} failed due to [{}]",
1491                    failed_sync_table_epochs, reason
1492                )));
1493                send_sync_result(sync_result_sender, result);
1494                return;
1495            }
1496        };
1497        debug!(?sync_table_epochs, "start sync epoch");
1498
1499        data.sync(&self.context, sync_result_sender, sync_table_epochs);
1500
1501        data.may_notify_sync_task(&self.context);
1502
1503        self.context
1504            .stats
1505            .uploader_syncing_epoch_count
1506            .set(data.syncing_data.len() as _);
1507    }
1508
1509    pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) {
1510        if let UploaderState::Working(data) = &mut self.state {
1511            // TODO: may only `ack_committed` on table whose `committed_epoch` is changed.
1512            for (table_id, info) in pinned_version.state_table_info.info() {
1513                if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) {
1514                    table_data.ack_committed(info.committed_epoch);
1515                }
1516            }
1517        }
1518        self.context.pinned_version = pinned_version;
1519    }
1520
1521    pub(crate) fn may_flush(&mut self, spiller_latency: &Histogram) -> bool {
1522        let UploaderState::Working(data) = &mut self.state else {
1523            return false;
1524        };
1525        if self.context.buffer_tracker.need_flush() {
1526            let timer = spiller_latency.start_timer();
1527            let mut spiller = Spiller::new(&mut data.unsync_data);
1528            let mut curr_batch_flush_size = 0;
1529            // iterate from older epoch to newer epoch
1530            while self
1531                .context
1532                .buffer_tracker
1533                .need_more_flush(curr_batch_flush_size)
1534                && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload()
1535            {
1536                assert!(!payload.is_empty());
1537                {
1538                    let (task_id, task_size, spilled_table_ids) =
1539                        data.task_manager
1540                            .spill(&self.context, spilled_table_ids, payload);
1541                    for table_id in spilled_table_ids {
1542                        spiller
1543                            .unsync_data()
1544                            .table_data
1545                            .get_mut(table_id)
1546                            .expect("should exist")
1547                            .spill_tasks
1548                            .entry(epoch)
1549                            .or_default()
1550                            .push_front(task_id);
1551                    }
1552                    curr_batch_flush_size += task_size;
1553                }
1554            }
1555            timer.observe_duration();
1556            data.check_upload_task_consistency();
1557            curr_batch_flush_size > 0
1558        } else {
1559            false
1560        }
1561    }
1562
1563    pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
1564        if let Some(table_ids) = table_ids {
1565            if let UploaderState::Working(data) = &mut self.state {
1566                data.clear_tables(table_ids);
1567            }
1568        } else {
1569            if let UploaderState::Working(data) = replace(
1570                &mut self.state,
1571                UploaderState::Working(UploaderData::default()),
1572            ) {
1573                data.abort(|| HummockError::other("uploader is reset"));
1574            }
1575
1576            self.context.stats.uploader_syncing_epoch_count.set(0);
1577        }
1578    }
1579
1580    pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
1581        let UploaderState::Working(data) = &mut self.state else {
1582            return;
1583        };
1584        data.unsync_data.may_destroy_instance(instance_id);
1585    }
1586
1587    pub(crate) fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1588        if let UploaderState::Working(ref u) = self.state {
1589            u.min_uncommitted_object_id()
1590        } else {
1591            None
1592        }
1593    }
1594}
1595
1596impl UploaderData {
1597    fn may_notify_sync_task(&mut self, context: &UploaderContext) {
1598        while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
1599            && syncing_data.remaining_uploading_tasks.is_empty()
1600        {
1601            let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty");
1602            let SyncingData {
1603                sync_table_epochs,
1604                remaining_uploading_tasks: _,
1605                uploaded,
1606                table_watermarks,
1607                vector_index_adds,
1608                sync_result_sender,
1609            } = syncing_data;
1610            context
1611                .stats
1612                .uploader_syncing_epoch_count
1613                .set(self.syncing_data.len() as _);
1614
1615            for (sync_epoch, table_ids) in sync_table_epochs {
1616                for table_id in table_ids {
1617                    if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) {
1618                        table_data.ack_synced(sync_epoch);
1619                        if table_data.is_empty() {
1620                            self.unsync_data.table_data.remove(&table_id);
1621                        }
1622                    }
1623                }
1624            }
1625
1626            send_sync_result(
1627                sync_result_sender,
1628                Ok(SyncedData {
1629                    uploaded_ssts: uploaded,
1630                    table_watermarks,
1631                    vector_index_adds,
1632                }),
1633            )
1634        }
1635    }
1636
1637    fn check_upload_task_consistency(&self) {
1638        #[cfg(debug_assertions)]
1639        {
1640            let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new();
1641            for table_data in self.unsync_data.table_data.values() {
1642                for task_id in table_data
1643                    .spill_tasks
1644                    .iter()
1645                    .flat_map(|(_, tasks)| tasks.iter())
1646                {
1647                    assert!(
1648                        spill_task_table_id_from_data
1649                            .entry(*task_id)
1650                            .or_default()
1651                            .insert(table_data.table_id)
1652                    );
1653                }
1654            }
1655            let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self
1656                .syncing_data
1657                .iter()
1658                .filter_map(|(sync_id, data)| {
1659                    if data.remaining_uploading_tasks.is_empty() {
1660                        None
1661                    } else {
1662                        Some((*sync_id, data.remaining_uploading_tasks.clone()))
1663                    }
1664                })
1665                .collect();
1666
1667            let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1668            for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
1669                spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
1670            }
1671            let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1672            for (task_id, status) in self.task_manager.tasks() {
1673                match status {
1674                    UploadingTaskStatus::Spilling(table_ids) => {
1675                        assert!(
1676                            spill_task_table_id_from_manager
1677                                .insert(task_id, table_ids.clone())
1678                                .is_none()
1679                        );
1680                    }
1681                    UploadingTaskStatus::Sync(sync_id) => {
1682                        assert!(
1683                            syncing_task_from_manager
1684                                .entry(*sync_id)
1685                                .or_default()
1686                                .insert(task_id)
1687                        );
1688                    }
1689                }
1690            }
1691            assert_eq!(
1692                spill_task_table_id_from_data,
1693                spill_task_table_id_from_manager
1694            );
1695            assert_eq!(syncing_task_id_from_data, syncing_task_from_manager);
1696        }
1697    }
1698}
1699
1700impl HummockUploader {
1701    pub(super) fn next_uploaded_ssts(
1702        &mut self,
1703    ) -> impl Future<Output = Vec<Arc<StagingSstableInfo>>> + '_ {
1704        poll_fn(|cx| {
1705            let UploaderState::Working(data) = &mut self.state else {
1706                return Poll::Pending;
1707            };
1708
1709            const MAX_BATCH_SIZE: usize = 16;
1710
1711            let mut ssts = None;
1712            while let Poll::Ready(Some((task_id, status, result))) =
1713                data.task_manager.poll_task_result(cx)
1714            {
1715                match result {
1716                    Ok(sst) => {
1717                        data.unsync_data.ack_flushed(&sst);
1718                        match status {
1719                            UploadingTaskStatus::Sync(sync_id) => {
1720                                let syncing_data =
1721                                    data.syncing_data.get_mut(&sync_id).expect("should exist");
1722                                syncing_data.uploaded.push_front(sst.clone());
1723                                assert!(syncing_data.remaining_uploading_tasks.remove(&task_id));
1724                                data.may_notify_sync_task(&self.context);
1725                            }
1726                            UploadingTaskStatus::Spilling(table_ids) => {
1727                                data.unsync_data
1728                                    .spilled_data
1729                                    .insert(task_id, (sst.clone(), table_ids));
1730                            }
1731                        }
1732                        data.check_upload_task_consistency();
1733                        let ssts = ssts.get_or_insert_with(|| Vec::with_capacity(MAX_BATCH_SIZE));
1734                        ssts.push(sst);
1735                        if ssts.len() >= MAX_BATCH_SIZE {
1736                            break;
1737                        }
1738                    }
1739                    Err((sync_id, e)) => {
1740                        let syncing_data =
1741                            data.syncing_data.remove(&sync_id).expect("should exist");
1742                        let failed_epochs = syncing_data.sync_table_epochs.clone();
1743                        let data = must_match!(replace(
1744                            &mut self.state,
1745                            UploaderState::Err(ErrState {
1746                                failed_sync_table_epochs: syncing_data.sync_table_epochs,
1747                                reason: e.as_report().to_string(),
1748                            }),
1749                        ), UploaderState::Working(data) => data);
1750
1751                        let _ = syncing_data
1752                            .sync_result_sender
1753                            .send(Err(HummockError::other(format!(
1754                                "failed to sync: {:?}",
1755                                e.as_report()
1756                            ))));
1757
1758                        data.abort(|| {
1759                            HummockError::other(format!(
1760                                "previous epoch {:?} failed to sync",
1761                                failed_epochs
1762                            ))
1763                        });
1764                        return Poll::Pending;
1765                    }
1766                }
1767            }
1768            if let Some(ssts) = ssts {
1769                Poll::Ready(ssts)
1770            } else {
1771                Poll::Pending
1772            }
1773        })
1774    }
1775}
1776
1777#[cfg(test)]
1778pub(crate) mod tests {
1779    use std::collections::{HashMap, HashSet};
1780    use std::future::{Future, poll_fn};
1781    use std::pin::pin;
1782    use std::sync::Arc;
1783    use std::sync::atomic::AtomicUsize;
1784    use std::sync::atomic::Ordering::SeqCst;
1785    use std::task::Poll;
1786
1787    use futures::FutureExt;
1788    use risingwave_common::catalog::TableId;
1789    use risingwave_common::metrics::LabelGuardedHistogram;
1790    use risingwave_common::util::epoch::EpochExt;
1791    use risingwave_hummock_sdk::HummockEpoch;
1792    use risingwave_hummock_sdk::vector_index::{
1793        FlatIndexAdd, VectorFileInfo, VectorIndexAdd, VectorStoreInfoDelta,
1794    };
1795    use tokio::sync::oneshot;
1796
1797    use super::test_utils::*;
1798    use crate::hummock::event_handler::uploader::{
1799        HummockUploader, SyncedData, UploadingTask, get_payload_imm_ids,
1800    };
1801    use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID};
1802    use crate::hummock::utils::MemoryTracker;
1803    use crate::hummock::{HummockError, HummockResult};
1804    use crate::mem_table::ImmutableMemtable;
1805    use crate::opts::StorageOpts;
1806
1807    impl HummockUploader {
1808        pub(super) fn add_imm(
1809            &mut self,
1810            instance_id: LocalInstanceId,
1811            imm: (ImmutableMemtable, MemoryTracker),
1812        ) {
1813            self.add_imms(instance_id, vec![imm]);
1814        }
1815
1816        pub(super) fn start_single_epoch_sync(
1817            &mut self,
1818            epoch: HummockEpoch,
1819            sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1820            table_ids: HashSet<TableId>,
1821        ) {
1822            self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]);
1823        }
1824
1825        pub(super) fn may_flush_for_test(&mut self) -> bool {
1826            self.may_flush(&LabelGuardedHistogram::test_histogram::<1>())
1827        }
1828    }
1829
1830    #[tokio::test]
1831    pub async fn test_uploading_task_future() {
1832        let uploader_context = test_uploader_context(dummy_success_upload_future);
1833
1834        let imm = gen_imm(INITIAL_EPOCH);
1835        let imm_size = imm.size();
1836        let imm_ids = get_imm_ids(vec![&imm]);
1837        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1838        assert_eq!(imm_size, task.task_info.task_size);
1839        assert_eq!(imm_ids, task.task_info.imm_ids);
1840        assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs);
1841        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1842        assert_eq!(
1843            output.sstable_infos(),
1844            &dummy_success_upload_output().new_value_ssts
1845        );
1846        assert_eq!(imm_size, output.imm_size());
1847        assert_eq!(&imm_ids, output.imm_ids());
1848        assert_eq!(&vec![INITIAL_EPOCH], output.epochs());
1849
1850        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1851        let imm = gen_imm(INITIAL_EPOCH);
1852        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1853        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1854    }
1855
1856    #[tokio::test]
1857    pub async fn test_uploading_task_poll_result() {
1858        let uploader_context = test_uploader_context(dummy_success_upload_future);
1859        let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1860        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1861        assert_eq!(
1862            output.sstable_infos(),
1863            &dummy_success_upload_output().new_value_ssts
1864        );
1865
1866        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1867        let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1868        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1869    }
1870
1871    #[tokio::test]
1872    async fn test_uploading_task_poll_ok_with_retry() {
1873        let run_count = Arc::new(AtomicUsize::new(0));
1874        let fail_num = 10;
1875        let run_count_clone = run_count.clone();
1876        let uploader_context = test_uploader_context(move |payload, info| {
1877            let run_count = run_count.clone();
1878            async move {
1879                // fail in the first `fail_num` run, and success at the end
1880                let ret = if run_count.load(SeqCst) < fail_num {
1881                    Err(HummockError::other("fail"))
1882                } else {
1883                    dummy_success_upload_future(payload, info).await
1884                };
1885                run_count.fetch_add(1, SeqCst);
1886                ret
1887            }
1888        });
1889        let mut task = UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH)], &uploader_context);
1890        let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await;
1891        assert_eq!(fail_num + 1, run_count_clone.load(SeqCst));
1892        assert_eq!(
1893            output.sstable_infos(),
1894            &dummy_success_upload_output().new_value_ssts
1895        );
1896    }
1897
1898    #[tokio::test]
1899    async fn test_uploader_basic() {
1900        let mut uploader = test_uploader(dummy_success_upload_future);
1901        let mut sst_collector = UploadedSstCollector::default();
1902        let epoch1 = INITIAL_EPOCH.next_epoch();
1903        const VECTOR_INDEX_TABLE_ID: TableId = TableId::new(234);
1904        uploader.start_epoch(
1905            epoch1,
1906            HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1907        );
1908        let (imm, tracker) = gen_imm_with_unlimit(epoch1);
1909        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1910        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
1911        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1912
1913        uploader.register_vector_writer(VECTOR_INDEX_TABLE_ID, epoch1);
1914        let vector_info_file = VectorFileInfo {
1915            object_id: 1.into(),
1916            vector_count: 1,
1917            file_size: 0,
1918            start_vector_id: 0,
1919            meta_offset: 20,
1920        };
1921        let vector_index_add = VectorIndexAdd::Flat(FlatIndexAdd {
1922            vector_store_info_delta: VectorStoreInfoDelta {
1923                next_vector_id: 1,
1924                added_vector_files: vec![vector_info_file.clone()],
1925            },
1926        });
1927        uploader.vector_writer_seal_epoch(
1928            VECTOR_INDEX_TABLE_ID,
1929            epoch1.next_epoch(),
1930            Some(vector_index_add.clone()),
1931        );
1932
1933        let (sync_tx, sync_rx) = oneshot::channel();
1934        uploader.start_single_epoch_sync(
1935            epoch1,
1936            sync_tx,
1937            HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1938        );
1939        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1940        assert_eq!(1, uploader.data().syncing_data.len());
1941        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1942        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1943        assert!(syncing_data.uploaded.is_empty());
1944        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1945
1946        let staging_sst = sst_collector.next(&mut uploader).await;
1947        assert_eq!(&vec![epoch1], staging_sst.epochs());
1948        assert_eq!(
1949            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1950            staging_sst.imm_ids()
1951        );
1952        assert_eq!(
1953            &dummy_success_upload_output().new_value_ssts,
1954            staging_sst.sstable_infos()
1955        );
1956
1957        match sync_rx.await {
1958            Ok(Ok(data)) => {
1959                let SyncedData {
1960                    uploaded_ssts,
1961                    table_watermarks,
1962                    vector_index_adds,
1963                } = data;
1964                assert_eq!(1, uploaded_ssts.len());
1965                let staging_sst = &uploaded_ssts[0];
1966                assert_eq!(&vec![epoch1], staging_sst.epochs());
1967                assert_eq!(
1968                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1969                    staging_sst.imm_ids()
1970                );
1971                assert_eq!(
1972                    &dummy_success_upload_output().new_value_ssts,
1973                    staging_sst.sstable_infos()
1974                );
1975                assert!(table_watermarks.is_empty());
1976                assert_eq!(vector_index_adds.len(), 1);
1977                let (table_id, vector_index_adds) = vector_index_adds.into_iter().next().unwrap();
1978                assert_eq!(table_id, VECTOR_INDEX_TABLE_ID);
1979                assert_eq!(vector_index_adds.len(), 1);
1980                let synced_vector_index_add = vector_index_adds[0].clone();
1981                assert_eq!(vector_index_add, synced_vector_index_add);
1982            }
1983            _ => unreachable!(),
1984        };
1985        assert_eq!(epoch1, uploader.test_max_synced_epoch());
1986
1987        let new_pinned_version = uploader
1988            .context
1989            .pinned_version
1990            .new_pin_version(test_hummock_version(epoch1))
1991            .unwrap();
1992        uploader.update_pinned_version(new_pinned_version);
1993        assert_eq!(
1994            epoch1,
1995            uploader
1996                .context
1997                .pinned_version
1998                .table_committed_epoch(TEST_TABLE_ID)
1999                .unwrap()
2000        );
2001    }
2002
2003    #[tokio::test]
2004    async fn test_uploader_destroy_instance_before_sync() {
2005        let mut uploader = test_uploader(dummy_success_upload_future);
2006        let mut sst_collector = UploadedSstCollector::default();
2007        let epoch1 = INITIAL_EPOCH.next_epoch();
2008        uploader.start_epochs_for_test([epoch1]);
2009        let (imm, tracker) = gen_imm_with_unlimit(epoch1);
2010        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2011        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
2012        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2013        uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);
2014
2015        let (sync_tx, sync_rx) = oneshot::channel();
2016        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2017        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
2018        assert_eq!(1, uploader.data().syncing_data.len());
2019        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
2020        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
2021        assert!(syncing_data.uploaded.is_empty());
2022        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
2023
2024        let staging_sst = sst_collector.next(&mut uploader).await;
2025        assert_eq!(&vec![epoch1], staging_sst.epochs());
2026        assert_eq!(
2027            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2028            staging_sst.imm_ids()
2029        );
2030        assert_eq!(
2031            &dummy_success_upload_output().new_value_ssts,
2032            staging_sst.sstable_infos()
2033        );
2034
2035        match sync_rx.await {
2036            Ok(Ok(data)) => {
2037                let SyncedData {
2038                    uploaded_ssts,
2039                    table_watermarks,
2040                    ..
2041                } = data;
2042                assert_eq!(1, uploaded_ssts.len());
2043                let staging_sst = &uploaded_ssts[0];
2044                assert_eq!(&vec![epoch1], staging_sst.epochs());
2045                assert_eq!(
2046                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2047                    staging_sst.imm_ids()
2048                );
2049                assert_eq!(
2050                    &dummy_success_upload_output().new_value_ssts,
2051                    staging_sst.sstable_infos()
2052                );
2053                assert!(table_watermarks.is_empty());
2054            }
2055            _ => unreachable!(),
2056        };
2057        assert!(
2058            !uploader
2059                .data()
2060                .unsync_data
2061                .table_data
2062                .contains_key(&TEST_TABLE_ID)
2063        );
2064    }
2065
2066    #[tokio::test]
2067    async fn test_empty_uploader_sync() {
2068        let mut uploader = test_uploader(dummy_success_upload_future);
2069        let epoch1 = INITIAL_EPOCH.next_epoch();
2070
2071        let (sync_tx, sync_rx) = oneshot::channel();
2072        uploader.start_epochs_for_test([epoch1]);
2073        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2074        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2075        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2076        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2077
2078        assert_uploader_pending(&mut uploader).await;
2079
2080        match sync_rx.await {
2081            Ok(Ok(data)) => {
2082                assert!(data.uploaded_ssts.is_empty());
2083            }
2084            _ => unreachable!(),
2085        };
2086        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2087        let new_pinned_version = uploader
2088            .context
2089            .pinned_version
2090            .new_pin_version(test_hummock_version(epoch1))
2091            .unwrap();
2092        uploader.update_pinned_version(new_pinned_version);
2093        assert!(uploader.data().syncing_data.is_empty());
2094        assert_eq!(
2095            epoch1,
2096            uploader
2097                .context
2098                .pinned_version
2099                .table_committed_epoch(TEST_TABLE_ID)
2100                .unwrap()
2101        );
2102    }
2103
2104    #[tokio::test]
2105    async fn test_uploader_empty_epoch() {
2106        let mut uploader = test_uploader(dummy_success_upload_future);
2107        let epoch1 = INITIAL_EPOCH.next_epoch();
2108        let epoch2 = epoch1.next_epoch();
2109        uploader.start_epochs_for_test([epoch1, epoch2]);
2110        let (imm, tracker) = gen_imm_with_unlimit(epoch2);
2111        // epoch1 is empty while epoch2 is not. Going to seal empty epoch1.
2112        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2113        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2114        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm, tracker));
2115
2116        let (sync_tx, sync_rx) = oneshot::channel();
2117        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2118        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2119
2120        assert_uploader_pending(&mut uploader).await;
2121
2122        match sync_rx.await {
2123            Ok(Ok(data)) => {
2124                assert!(data.uploaded_ssts.is_empty());
2125            }
2126            _ => unreachable!(),
2127        };
2128        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2129        let new_pinned_version = uploader
2130            .context
2131            .pinned_version
2132            .new_pin_version(test_hummock_version(epoch1))
2133            .unwrap();
2134        uploader.update_pinned_version(new_pinned_version);
2135        assert!(uploader.data().syncing_data.is_empty());
2136        assert_eq!(
2137            epoch1,
2138            uploader
2139                .context
2140                .pinned_version
2141                .table_committed_epoch(TEST_TABLE_ID)
2142                .unwrap()
2143        );
2144    }
2145
2146    #[tokio::test]
2147    async fn test_uploader_poll_empty() {
2148        let mut uploader = test_uploader(dummy_success_upload_future);
2149        let fut = uploader.next_uploaded_ssts();
2150        let mut fut = pin!(fut);
2151        assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await);
2152    }
2153
2154    #[tokio::test]
2155    async fn test_uploader_empty_advance_mce() {
2156        let mut uploader = test_uploader(dummy_success_upload_future);
2157        let mut sst_collector = UploadedSstCollector::default();
2158        let initial_pinned_version = uploader.context.pinned_version.clone();
2159        let epoch1 = INITIAL_EPOCH.next_epoch();
2160        let epoch2 = epoch1.next_epoch();
2161        let epoch3 = epoch2.next_epoch();
2162        let epoch4 = epoch3.next_epoch();
2163        let epoch5 = epoch4.next_epoch();
2164        let epoch6 = epoch5.next_epoch();
2165        let version1 = initial_pinned_version
2166            .new_pin_version(test_hummock_version(epoch1))
2167            .unwrap();
2168        let version2 = initial_pinned_version
2169            .new_pin_version(test_hummock_version(epoch2))
2170            .unwrap();
2171        let version3 = initial_pinned_version
2172            .new_pin_version(test_hummock_version(epoch3))
2173            .unwrap();
2174        let version4 = initial_pinned_version
2175            .new_pin_version(test_hummock_version(epoch4))
2176            .unwrap();
2177        let version5 = initial_pinned_version
2178            .new_pin_version(test_hummock_version(epoch5))
2179            .unwrap();
2180
2181        uploader.start_epochs_for_test([epoch6]);
2182        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
2183
2184        uploader.update_pinned_version(version1);
2185        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2186        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2187
2188        let (imm, tracker) = gen_imm_with_unlimit(epoch6);
2189        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, (imm.clone(), tracker));
2190        uploader.update_pinned_version(version2);
2191        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2192        assert_eq!(epoch2, uploader.test_max_syncing_epoch());
2193
2194        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6);
2195        uploader.update_pinned_version(version3);
2196        assert_eq!(epoch3, uploader.test_max_synced_epoch());
2197        assert_eq!(epoch3, uploader.test_max_syncing_epoch());
2198
2199        let (sync_tx, sync_rx) = oneshot::channel();
2200        uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2201        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2202        uploader.update_pinned_version(version4);
2203        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2204        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2205
2206        let sst = sst_collector.next(&mut uploader).await;
2207        assert_eq!(&get_imm_ids([&imm]), sst.imm_ids());
2208
2209        match sync_rx.await {
2210            Ok(Ok(data)) => {
2211                assert!(data.table_watermarks.is_empty());
2212                assert_eq!(1, data.uploaded_ssts.len());
2213                assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids());
2214            }
2215            _ => unreachable!(),
2216        }
2217
2218        uploader.update_pinned_version(version5);
2219        assert_eq!(epoch6, uploader.test_max_synced_epoch());
2220        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2221    }
2222
2223    #[tokio::test]
2224    async fn test_uploader_finish_in_order() {
2225        let config = StorageOpts {
2226            shared_buffer_capacity_mb: 1024 * 1024,
2227            shared_buffer_flush_ratio: 0.0,
2228            ..Default::default()
2229        };
2230        let (buffer_tracker, mut uploader, new_task_notifier) =
2231            prepare_uploader_order_test(&config, false);
2232        let mut sst_collector = UploadedSstCollector::default();
2233
2234        let epoch1 = INITIAL_EPOCH.next_epoch();
2235        let epoch2 = epoch1.next_epoch();
2236        let epoch3 = epoch2.next_epoch();
2237        let epoch4 = epoch3.next_epoch();
2238        uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]);
2239        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2240        let memory_limiter = memory_limiter.as_ref();
2241
2242        let instance_id1 = 1;
2243        let instance_id2 = 2;
2244
2245        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2246        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2247
2248        // imm2 contains data in newer epoch, but added first
2249        let (imm2, tracker2) = gen_imm_with_limiter(epoch2, memory_limiter).await;
2250        uploader.add_imm(instance_id2, (imm2.clone(), tracker2));
2251        let (imm1_1, tracker1_1) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2252        uploader.add_imm(instance_id1, (imm1_1.clone(), tracker1_1));
2253        let (imm1_2, tracker1_2) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2254        uploader.add_imm(instance_id1, (imm1_2.clone(), tracker1_2));
2255
2256        // imm1 will be spilled first
2257        let epoch1_spill_payload12 =
2258            HashMap::from_iter([(instance_id1, vec![imm1_2.clone(), imm1_1.clone()])]);
2259        let epoch2_spill_payload = HashMap::from_iter([(instance_id2, vec![imm2.clone()])]);
2260        let (await_start1, finish_tx1) =
2261            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload12));
2262        let (await_start2, finish_tx2) =
2263            new_task_notifier(get_payload_imm_ids(&epoch2_spill_payload));
2264        uploader.may_flush_for_test();
2265        await_start1.await;
2266        await_start2.await;
2267
2268        assert_uploader_pending(&mut uploader).await;
2269
2270        finish_tx2.send(()).unwrap();
2271        assert_uploader_pending(&mut uploader).await;
2272
2273        finish_tx1.send(()).unwrap();
2274        let sst = sst_collector.next(&mut uploader).await;
2275        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids());
2276        assert_eq!(&vec![epoch1], sst.epochs());
2277
2278        let sst = sst_collector.next(&mut uploader).await;
2279        assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids());
2280        assert_eq!(&vec![epoch2], sst.epochs());
2281
2282        let (imm1_3, tracker1_3) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2283        uploader.add_imm(instance_id1, (imm1_3.clone(), tracker1_3));
2284        let epoch1_spill_payload3 = HashMap::from_iter([(instance_id1, vec![imm1_3.clone()])]);
2285        let (await_start1_3, finish_tx1_3) =
2286            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload3));
2287        uploader.may_flush_for_test();
2288        await_start1_3.await;
2289        let (imm1_4, tracker1_4) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2290        uploader.add_imm(instance_id1, (imm1_4.clone(), tracker1_4));
2291        let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]);
2292        let (await_start1_4, finish_tx1_4) =
2293            new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload));
2294        uploader.local_seal_epoch_for_test(instance_id1, epoch1);
2295        let (sync_tx1, sync_rx1) = oneshot::channel();
2296        uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID]));
2297        await_start1_4.await;
2298
2299        uploader.local_seal_epoch_for_test(instance_id1, epoch2);
2300        uploader.local_seal_epoch_for_test(instance_id2, epoch2);
2301
2302        // current uploader state:
2303        // unsealed: empty
2304        // sealed: epoch2: uploaded sst([imm2])
2305        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2306
2307        let (imm3_1, tracker3_1) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2308        let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]);
2309        uploader.add_imm(instance_id1, (imm3_1.clone(), tracker3_1));
2310        let (await_start3_1, finish_tx3_1) =
2311            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload1));
2312        uploader.may_flush_for_test();
2313        await_start3_1.await;
2314        let (imm3_2, tracker3_2) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2315        let epoch3_spill_payload2 = HashMap::from_iter([(instance_id2, vec![imm3_2.clone()])]);
2316        uploader.add_imm(instance_id2, (imm3_2.clone(), tracker3_2));
2317        let (await_start3_2, finish_tx3_2) =
2318            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload2));
2319        uploader.may_flush_for_test();
2320        await_start3_2.await;
2321        let (imm3_3, tracker3_3) = gen_imm_with_limiter(epoch3, memory_limiter).await;
2322        uploader.add_imm(instance_id1, (imm3_3.clone(), tracker3_3));
2323
2324        // current uploader state:
2325        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2326        // sealed: uploaded sst([imm2])
2327        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2328
2329        uploader.local_seal_epoch_for_test(instance_id1, epoch3);
2330        let (imm4, tracker4) = gen_imm_with_limiter(epoch4, memory_limiter).await;
2331        uploader.add_imm(instance_id1, (imm4.clone(), tracker4));
2332        assert_uploader_pending(&mut uploader).await;
2333
2334        // current uploader state:
2335        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2336        //           epoch4: imm: imm4
2337        // sealed: uploaded sst([imm2])
2338        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2339
2340        finish_tx3_1.send(()).unwrap();
2341        assert_uploader_pending(&mut uploader).await;
2342        finish_tx1_4.send(()).unwrap();
2343        assert_uploader_pending(&mut uploader).await;
2344        finish_tx1_3.send(()).unwrap();
2345
2346        let sst = sst_collector.next(&mut uploader).await;
2347        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids());
2348
2349        let sst = sst_collector.next(&mut uploader).await;
2350        assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids());
2351
2352        match sync_rx1.await {
2353            Ok(Ok(data)) => {
2354                assert_eq!(3, data.uploaded_ssts.len());
2355                assert_eq!(
2356                    &get_payload_imm_ids(&epoch1_sync_payload),
2357                    data.uploaded_ssts[0].imm_ids()
2358                );
2359                assert_eq!(
2360                    &get_payload_imm_ids(&epoch1_spill_payload3),
2361                    data.uploaded_ssts[1].imm_ids()
2362                );
2363                assert_eq!(
2364                    &get_payload_imm_ids(&epoch1_spill_payload12),
2365                    data.uploaded_ssts[2].imm_ids()
2366                );
2367            }
2368            _ => {
2369                unreachable!()
2370            }
2371        }
2372
2373        // current uploader state:
2374        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2375        //           epoch4: imm: imm4
2376        // sealed: uploaded sst([imm2])
2377        // syncing: empty
2378        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2379
2380        let (sync_tx2, sync_rx2) = oneshot::channel();
2381        uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID]));
2382        uploader.local_seal_epoch_for_test(instance_id2, epoch3);
2383        let sst = sst_collector.next(&mut uploader).await;
2384        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids());
2385
2386        match sync_rx2.await {
2387            Ok(Ok(data)) => {
2388                assert_eq!(data.uploaded_ssts.len(), 1);
2389                assert_eq!(
2390                    &get_payload_imm_ids(&epoch2_spill_payload),
2391                    data.uploaded_ssts[0].imm_ids()
2392                );
2393            }
2394            _ => {
2395                unreachable!("should be sync finish");
2396            }
2397        }
2398        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2399
2400        // current uploader state:
2401        // unsealed: epoch4: imm: imm4
2402        // sealed: imm: imm3_3, uploading: [imm3_2], uploaded: sst([imm3_1])
2403        // syncing: empty
2404        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2405        //         epoch2: sst([imm2])
2406
2407        uploader.local_seal_epoch_for_test(instance_id1, epoch4);
2408        uploader.local_seal_epoch_for_test(instance_id2, epoch4);
2409        let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]);
2410        let (await_start4_with_3_3, finish_tx4_with_3_3) =
2411            new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload));
2412        let (sync_tx4, mut sync_rx4) = oneshot::channel();
2413        uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID]));
2414        await_start4_with_3_3.await;
2415
2416        // current uploader state:
2417        // unsealed: empty
2418        // sealed: empty
2419        // syncing: epoch4: uploading: [imm4, imm3_3], [imm3_2], uploaded: sst([imm3_1])
2420        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2421        //         epoch2: sst([imm2])
2422
2423        assert_uploader_pending(&mut uploader).await;
2424
2425        finish_tx3_2.send(()).unwrap();
2426        let sst = sst_collector.next(&mut uploader).await;
2427        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids());
2428
2429        finish_tx4_with_3_3.send(()).unwrap();
2430        assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await);
2431
2432        let sst = sst_collector.next(&mut uploader).await;
2433        assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids());
2434
2435        match sync_rx4.await {
2436            Ok(Ok(data)) => {
2437                assert_eq!(3, data.uploaded_ssts.len());
2438                assert_eq!(
2439                    &get_payload_imm_ids(&epoch4_sync_payload),
2440                    data.uploaded_ssts[0].imm_ids()
2441                );
2442                assert_eq!(
2443                    &get_payload_imm_ids(&epoch3_spill_payload2),
2444                    data.uploaded_ssts[1].imm_ids()
2445                );
2446                assert_eq!(
2447                    &get_payload_imm_ids(&epoch3_spill_payload1),
2448                    data.uploaded_ssts[2].imm_ids(),
2449                )
2450            }
2451            _ => {
2452                unreachable!("should be sync finish");
2453            }
2454        }
2455        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2456
2457        // current uploader state:
2458        // unsealed: empty
2459        // sealed: empty
2460        // syncing: empty
2461        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2462        //         epoch2: sst([imm2])
2463        //         epoch4: sst([imm4, imm3_3]), sst([imm3_2]), sst([imm3_1])
2464    }
2465
2466    #[tokio::test]
2467    async fn test_uploader_frequently_flush() {
2468        let config = StorageOpts {
2469            shared_buffer_capacity_mb: 10,
2470            shared_buffer_flush_ratio: 0.12,
2471            // This test will fail when we set it to 0
2472            shared_buffer_min_batch_flush_size_mb: 1,
2473            ..Default::default()
2474        };
2475        let (buffer_tracker, mut uploader, _new_task_notifier) =
2476            prepare_uploader_order_test(&config, true);
2477
2478        let epoch1 = INITIAL_EPOCH.next_epoch();
2479        let epoch2 = epoch1.next_epoch();
2480        uploader.start_epochs_for_test([epoch1, epoch2]);
2481        let instance_id1 = 1;
2482        let instance_id2 = 2;
2483        let flush_threshold = buffer_tracker.flush_threshold();
2484        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2485        let memory_limiter = memory_limiter.as_ref();
2486
2487        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2488        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2489
2490        // imm2 contains data in newer epoch, but added first
2491        let mut total_memory = 0;
2492        while total_memory < flush_threshold {
2493            let (imm, tracker) = gen_imm_with_limiter(epoch2, memory_limiter).await;
2494            total_memory += imm.size();
2495            if total_memory > flush_threshold {
2496                break;
2497            }
2498            uploader.add_imm(instance_id2, (imm, tracker));
2499        }
2500        let (imm, tracker) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2501        uploader.add_imm(instance_id1, (imm, tracker));
2502        assert!(uploader.may_flush_for_test());
2503
2504        for _ in 0..10 {
2505            let (imm, tracker) = gen_imm_with_limiter(epoch1, memory_limiter).await;
2506            uploader.add_imm(instance_id1, (imm, tracker));
2507            assert!(!uploader.may_flush_for_test());
2508        }
2509    }
2510}