risingwave_storage/hummock/event_handler/uploader/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod spiller;
16mod task_manager;
17pub(crate) mod test_utils;
18
19use std::cmp::Ordering;
20use std::collections::btree_map::Entry;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
22use std::fmt::{Debug, Display, Formatter};
23use std::future::{Future, poll_fn};
24use std::mem::{replace, swap, take};
25use std::sync::Arc;
26use std::task::{Context, Poll, ready};
27
28use futures::FutureExt;
29use itertools::Itertools;
30use more_asserts::assert_gt;
31use prometheus::{HistogramTimer, IntGauge};
32use risingwave_common::bitmap::BitmapBuilder;
33use risingwave_common::catalog::TableId;
34use risingwave_common::metrics::UintGauge;
35use risingwave_common::must_match;
36use risingwave_hummock_sdk::table_watermark::{
37    TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId, LocalSstableInfo};
40use task_manager::{TaskManager, UploadingTaskStatus};
41use thiserror_ext::AsReport;
42use tokio::sync::oneshot;
43use tokio::task::JoinHandle;
44use tracing::{debug, error, info, warn};
45
46use crate::hummock::event_handler::LocalInstanceId;
47use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, send_sync_result};
48use crate::hummock::event_handler::uploader::spiller::Spiller;
49use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm;
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchId;
52use crate::hummock::store::version::StagingSstableInfo;
53use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
54use crate::mem_table::ImmId;
55use crate::monitor::HummockStateStoreMetrics;
56use crate::store::SealCurrentEpochOptions;
57
58/// Take epoch data inclusively before `epoch` out from `data`
59fn take_before_epoch<T>(
60    data: &mut BTreeMap<HummockEpoch, T>,
61    epoch: HummockEpoch,
62) -> BTreeMap<HummockEpoch, T> {
63    let mut before_epoch_data = data.split_off(&(epoch + 1));
64    swap(&mut before_epoch_data, data);
65    before_epoch_data
66}
67
68type UploadTaskInput = HashMap<LocalInstanceId, Vec<UploaderImm>>;
69pub type UploadTaskPayload = HashMap<LocalInstanceId, Vec<ImmutableMemtable>>;
70
71#[derive(Debug)]
72pub struct UploadTaskOutput {
73    pub new_value_ssts: Vec<LocalSstableInfo>,
74    pub old_value_ssts: Vec<LocalSstableInfo>,
75    pub wait_poll_timer: Option<HistogramTimer>,
76}
77pub type SpawnUploadTask = Arc<
78    dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle<HummockResult<UploadTaskOutput>>
79        + Send
80        + Sync
81        + 'static,
82>;
83
84#[derive(Clone)]
85pub struct UploadTaskInfo {
86    pub task_size: usize,
87    pub epochs: Vec<HummockEpoch>,
88    pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
89}
90
91impl Display for UploadTaskInfo {
92    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct("UploadTaskInfo")
94            .field("task_size", &self.task_size)
95            .field("epochs", &self.epochs)
96            .field("len(imm_ids)", &self.imm_ids.len())
97            .finish()
98    }
99}
100
101impl Debug for UploadTaskInfo {
102    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct("UploadTaskInfo")
104            .field("task_size", &self.task_size)
105            .field("epochs", &self.epochs)
106            .field("imm_ids", &self.imm_ids)
107            .finish()
108    }
109}
110
111mod uploader_imm {
112    use std::fmt::Formatter;
113    use std::ops::Deref;
114
115    use risingwave_common::metrics::UintGauge;
116
117    use crate::hummock::event_handler::uploader::UploaderContext;
118    use crate::mem_table::ImmutableMemtable;
119
120    pub(super) struct UploaderImm {
121        inner: ImmutableMemtable,
122        size_guard: UintGauge,
123    }
124
125    impl UploaderImm {
126        pub(super) fn new(imm: ImmutableMemtable, context: &UploaderContext) -> Self {
127            let size = imm.size();
128            let size_guard = context.stats.uploader_imm_size.clone();
129            size_guard.add(size as _);
130            Self {
131                inner: imm,
132                size_guard,
133            }
134        }
135
136        #[cfg(test)]
137        pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
138            Self {
139                inner: imm,
140                size_guard: UintGauge::new("test", "test").unwrap(),
141            }
142        }
143    }
144
145    impl std::fmt::Debug for UploaderImm {
146        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
147            self.inner.fmt(f)
148        }
149    }
150
151    impl Deref for UploaderImm {
152        type Target = ImmutableMemtable;
153
154        fn deref(&self) -> &Self::Target {
155            &self.inner
156        }
157    }
158
159    impl Drop for UploaderImm {
160        fn drop(&mut self) {
161            self.size_guard.sub(self.inner.size() as _);
162        }
163    }
164}
165
166#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
167struct UploadingTaskId(usize);
168
169/// A wrapper for a uploading task that compacts and uploads the imm payload. Task context are
170/// stored so that when the task fails, it can be re-tried.
171struct UploadingTask {
172    task_id: UploadingTaskId,
173    // newer data at the front
174    input: UploadTaskInput,
175    join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
176    task_info: UploadTaskInfo,
177    spawn_upload_task: SpawnUploadTask,
178    task_size_guard: UintGauge,
179    task_count_guard: IntGauge,
180}
181
182impl Drop for UploadingTask {
183    fn drop(&mut self) {
184        self.task_size_guard.sub(self.task_info.task_size as u64);
185        self.task_count_guard.dec();
186    }
187}
188
189impl Debug for UploadingTask {
190    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
191        f.debug_struct("UploadingTask")
192            .field("input", &self.input)
193            .field("task_info", &self.task_info)
194            .finish()
195    }
196}
197
198fn get_payload_imm_ids(
199    payload: &UploadTaskPayload,
200) -> HashMap<LocalInstanceId, Vec<SharedBufferBatchId>> {
201    payload
202        .iter()
203        .map(|(instance_id, imms)| {
204            (
205                *instance_id,
206                imms.iter().map(|imm| imm.batch_id()).collect_vec(),
207            )
208        })
209        .collect()
210}
211
212impl UploadingTask {
213    // INFO logs will be enabled for task with size exceeding 50MB.
214    const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);
215
216    fn input_to_payload(input: &UploadTaskInput) -> UploadTaskPayload {
217        input
218            .iter()
219            .map(|(instance_id, imms)| {
220                (
221                    *instance_id,
222                    imms.iter().map(|imm| (**imm).clone()).collect(),
223                )
224            })
225            .collect()
226    }
227
228    fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
229        assert!(!input.is_empty());
230        let mut epochs = input
231            .iter()
232            .flat_map(|(_, imms)| imms.iter().flat_map(|imm| imm.epochs().iter().cloned()))
233            .sorted()
234            .dedup()
235            .collect_vec();
236
237        // reverse to make newer epochs comes first
238        epochs.reverse();
239        let payload = Self::input_to_payload(&input);
240        let imm_ids = get_payload_imm_ids(&payload);
241        let task_size = input
242            .values()
243            .map(|imms| imms.iter().map(|imm| imm.size()).sum::<usize>())
244            .sum();
245        let task_info = UploadTaskInfo {
246            task_size,
247            epochs,
248            imm_ids,
249        };
250        context
251            .buffer_tracker
252            .global_upload_task_size()
253            .add(task_size as u64);
254        if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
255            info!("start upload task: {:?}", task_info);
256        } else {
257            debug!("start upload task: {:?}", task_info);
258        }
259        let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
260        context.stats.uploader_uploading_task_count.inc();
261        Self {
262            task_id,
263            input,
264            join_handle,
265            task_info,
266            spawn_upload_task: context.spawn_upload_task.clone(),
267            task_size_guard: context.buffer_tracker.global_upload_task_size().clone(),
268            task_count_guard: context.stats.uploader_uploading_task_count.clone(),
269        }
270    }
271
272    /// Poll the result of the uploading task
273    fn poll_result(
274        &mut self,
275        cx: &mut Context<'_>,
276    ) -> Poll<HummockResult<Arc<StagingSstableInfo>>> {
277        Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
278            Ok(task_result) => task_result
279                .inspect(|_| {
280                    if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
281                        info!(task_info = ?self.task_info, "upload task finish");
282                    } else {
283                        debug!(task_info = ?self.task_info, "upload task finish");
284                    }
285                })
286                .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
287                .map(|output| {
288                    Arc::new(StagingSstableInfo::new(
289                        output.new_value_ssts,
290                        output.old_value_ssts,
291                        self.task_info.epochs.clone(),
292                        self.task_info.imm_ids.clone(),
293                        self.task_info.task_size,
294                    ))
295                }),
296
297            Err(err) => Err(HummockError::other(format!(
298                "fail to join upload join handle: {}",
299                err.as_report()
300            ))),
301        })
302    }
303
304    /// Poll the uploading task until it succeeds. If it fails, we will retry it.
305    fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll<Arc<StagingSstableInfo>> {
306        loop {
307            let result = ready!(self.poll_result(cx));
308            match result {
309                Ok(sstables) => return Poll::Ready(sstables),
310                Err(e) => {
311                    error!(
312                        error = %e.as_report(),
313                        task_info = ?self.task_info,
314                        "a flush task failed, start retry",
315                    );
316                    self.join_handle = (self.spawn_upload_task)(
317                        Self::input_to_payload(&self.input),
318                        self.task_info.clone(),
319                    );
320                    // It is important not to return Poll::pending here immediately, because the new
321                    // join_handle is not polled yet, and will not awake the current task when
322                    // succeed. It will be polled in the next loop iteration.
323                }
324            }
325        }
326    }
327
328    pub fn get_task_info(&self) -> &UploadTaskInfo {
329        &self.task_info
330    }
331}
332
333impl TableUnsyncData {
334    fn add_table_watermarks(
335        &mut self,
336        epoch: HummockEpoch,
337        table_watermarks: Vec<VnodeWatermark>,
338        direction: WatermarkDirection,
339        watermark_type: WatermarkSerdeType,
340    ) {
341        if table_watermarks.is_empty() {
342            return;
343        }
344        let vnode_count = table_watermarks[0].vnode_count();
345        for watermark in &table_watermarks {
346            assert_eq!(vnode_count, watermark.vnode_count());
347        }
348
349        fn apply_new_vnodes(
350            vnode_bitmap: &mut BitmapBuilder,
351            vnode_watermarks: &Vec<VnodeWatermark>,
352        ) {
353            for vnode_watermark in vnode_watermarks {
354                for vnode in vnode_watermark.vnode_bitmap().iter_ones() {
355                    assert!(
356                        !vnode_bitmap.is_set(vnode),
357                        "vnode {} write multiple table watermarks",
358                        vnode
359                    );
360                    vnode_bitmap.set(vnode, true);
361                }
362            }
363        }
364        match &mut self.table_watermarks {
365            Some((prev_direction, prev_watermarks, prev_watermark_type)) => {
366                assert_eq!(
367                    *prev_direction, direction,
368                    "table id {} new watermark direction not match with previous",
369                    self.table_id
370                );
371                assert_eq!(
372                    *prev_watermark_type, watermark_type,
373                    "table id {} new watermark watermark_type not match with previous",
374                    self.table_id
375                );
376                match prev_watermarks.entry(epoch) {
377                    Entry::Occupied(mut entry) => {
378                        let (prev_watermarks, vnode_bitmap) = entry.get_mut();
379                        apply_new_vnodes(vnode_bitmap, &table_watermarks);
380                        prev_watermarks.extend(table_watermarks);
381                    }
382                    Entry::Vacant(entry) => {
383                        let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
384                        apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
385                        entry.insert((table_watermarks, vnode_bitmap));
386                    }
387                }
388            }
389            None => {
390                let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
391                apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
392                self.table_watermarks = Some((
393                    direction,
394                    BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]),
395                    watermark_type,
396                ));
397            }
398        }
399    }
400}
401
402impl UploaderData {
403    fn add_table_watermarks(
404        all_table_watermarks: &mut HashMap<TableId, TableWatermarks>,
405        table_id: TableId,
406        direction: WatermarkDirection,
407        watermarks: impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)>,
408        watermark_type: WatermarkSerdeType,
409    ) {
410        let mut table_watermarks: Option<TableWatermarks> = None;
411        for (epoch, watermarks) in watermarks {
412            match &mut table_watermarks {
413                Some(prev_watermarks) => {
414                    assert_eq!(prev_watermarks.direction, direction);
415                    assert_eq!(prev_watermarks.watermark_type, watermark_type);
416                    prev_watermarks.add_new_epoch_watermarks(
417                        epoch,
418                        Arc::from(watermarks),
419                        direction,
420                        watermark_type,
421                    );
422                }
423                None => {
424                    table_watermarks = Some(TableWatermarks::single_epoch(
425                        epoch,
426                        watermarks,
427                        direction,
428                        watermark_type,
429                    ));
430                }
431            }
432        }
433        if let Some(table_watermarks) = table_watermarks {
434            assert!(
435                all_table_watermarks
436                    .insert(table_id, table_watermarks)
437                    .is_none()
438            );
439        }
440    }
441}
442
443struct LocalInstanceEpochData {
444    epoch: HummockEpoch,
445    // newer data comes first.
446    imms: VecDeque<UploaderImm>,
447    has_spilled: bool,
448}
449
450impl LocalInstanceEpochData {
451    fn new(epoch: HummockEpoch) -> Self {
452        Self {
453            epoch,
454            imms: VecDeque::new(),
455            has_spilled: false,
456        }
457    }
458
459    fn epoch(&self) -> HummockEpoch {
460        self.epoch
461    }
462
463    fn add_imm(&mut self, imm: UploaderImm) {
464        assert_eq!(imm.max_epoch(), imm.min_epoch());
465        assert_eq!(self.epoch, imm.min_epoch());
466        if let Some(prev_imm) = self.imms.front() {
467            assert_gt!(imm.batch_id(), prev_imm.batch_id());
468        }
469        self.imms.push_front(imm);
470    }
471
472    fn is_empty(&self) -> bool {
473        self.imms.is_empty()
474    }
475}
476
477struct LocalInstanceUnsyncData {
478    table_id: TableId,
479    instance_id: LocalInstanceId,
480    // None means that the current instance should have stopped advancing
481    current_epoch_data: Option<LocalInstanceEpochData>,
482    // newer data comes first.
483    sealed_data: VecDeque<LocalInstanceEpochData>,
484    // newer data comes first
485    flushing_imms: VecDeque<SharedBufferBatchId>,
486    is_destroyed: bool,
487}
488
489impl LocalInstanceUnsyncData {
490    fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self {
491        Self {
492            table_id,
493            instance_id,
494            current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
495            sealed_data: VecDeque::new(),
496            flushing_imms: Default::default(),
497            is_destroyed: false,
498        }
499    }
500
501    fn add_imm(&mut self, imm: UploaderImm) {
502        assert!(!self.is_destroyed);
503        assert_eq!(self.table_id, imm.table_id);
504        self.current_epoch_data
505            .as_mut()
506            .expect("should be Some when adding new imm")
507            .add_imm(imm);
508    }
509
510    fn local_seal_epoch(&mut self, next_epoch: HummockEpoch) -> HummockEpoch {
511        assert!(!self.is_destroyed);
512        let data = self
513            .current_epoch_data
514            .as_mut()
515            .expect("should be Some when seal new epoch");
516        let current_epoch = data.epoch;
517        debug!(
518            instance_id = self.instance_id,
519            next_epoch, current_epoch, "local seal epoch"
520        );
521        assert_gt!(next_epoch, current_epoch);
522        let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch));
523        if !epoch_data.is_empty() {
524            self.sealed_data.push_front(epoch_data);
525        }
526        current_epoch
527    }
528
529    // imm_ids from old to new, which means in ascending order
530    fn ack_flushed(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
531        for imm_id in imm_ids {
532            assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id);
533        }
534    }
535
536    fn spill(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
537        let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() {
538            match oldest_sealed_epoch.epoch.cmp(&epoch) {
539                Ordering::Less => {
540                    unreachable!(
541                        "should not spill at this epoch because there \
542                    is unspilled data in previous epoch: prev epoch {}, spill epoch {}",
543                        oldest_sealed_epoch.epoch, epoch
544                    );
545                }
546                Ordering::Equal => {
547                    let epoch_data = self.sealed_data.pop_back().unwrap();
548                    assert_eq!(epoch, epoch_data.epoch);
549                    epoch_data.imms
550                }
551                Ordering::Greater => VecDeque::new(),
552            }
553        } else {
554            let Some(current_epoch_data) = &mut self.current_epoch_data else {
555                return Vec::new();
556            };
557            match current_epoch_data.epoch.cmp(&epoch) {
558                Ordering::Less => {
559                    assert!(
560                        current_epoch_data.imms.is_empty(),
561                        "should not spill at this epoch because there \
562                    is unspilled data in current epoch epoch {}, spill epoch {}",
563                        current_epoch_data.epoch,
564                        epoch
565                    );
566                    VecDeque::new()
567                }
568                Ordering::Equal => {
569                    if !current_epoch_data.imms.is_empty() {
570                        current_epoch_data.has_spilled = true;
571                        take(&mut current_epoch_data.imms)
572                    } else {
573                        VecDeque::new()
574                    }
575                }
576                Ordering::Greater => VecDeque::new(),
577            }
578        };
579        self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
580        imms.into_iter().collect()
581    }
582
583    fn add_flushing_imm(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
584        for imm_id in imm_ids {
585            if let Some(prev_imm_id) = self.flushing_imms.front() {
586                assert_gt!(imm_id, *prev_imm_id);
587            }
588            self.flushing_imms.push_front(imm_id);
589        }
590    }
591
592    // start syncing the imm inclusively before the `epoch`
593    // returning data with newer data coming first
594    fn sync(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
595        // firstly added from old to new
596        let mut ret = Vec::new();
597        while let Some(epoch_data) = self.sealed_data.back()
598            && epoch_data.epoch() <= epoch
599        {
600            let imms = self.sealed_data.pop_back().expect("checked exist").imms;
601            self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
602            ret.extend(imms.into_iter().rev());
603        }
604        // reverse so that newer data comes first
605        ret.reverse();
606        if let Some(latest_epoch_data) = &self.current_epoch_data {
607            if latest_epoch_data.epoch <= epoch {
608                assert!(self.sealed_data.is_empty());
609                assert!(latest_epoch_data.is_empty());
610                assert!(!latest_epoch_data.has_spilled);
611                if cfg!(debug_assertions) {
612                    panic!(
613                        "sync epoch exceeds latest epoch, and the current instance should have been archived"
614                    );
615                }
616                warn!(
617                    instance_id = self.instance_id,
618                    table_id = self.table_id.table_id,
619                    "sync epoch exceeds latest epoch, and the current instance should have be archived"
620                );
621                self.current_epoch_data = None;
622            }
623        }
624        ret
625    }
626
627    fn assert_after_epoch(&self, epoch: HummockEpoch) {
628        if let Some(oldest_sealed_data) = self.sealed_data.back() {
629            assert!(!oldest_sealed_data.imms.is_empty());
630            assert_gt!(oldest_sealed_data.epoch, epoch);
631        } else if let Some(current_data) = &self.current_epoch_data {
632            if current_data.epoch <= epoch {
633                assert!(current_data.imms.is_empty() && !current_data.has_spilled);
634            }
635        }
636    }
637
638    fn is_finished(&self) -> bool {
639        self.is_destroyed && self.sealed_data.is_empty()
640    }
641}
642
643struct TableUnsyncData {
644    table_id: TableId,
645    instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
646    #[expect(clippy::type_complexity)]
647    table_watermarks: Option<(
648        WatermarkDirection,
649        BTreeMap<HummockEpoch, (Vec<VnodeWatermark>, BitmapBuilder)>,
650        WatermarkSerdeType,
651    )>,
652    spill_tasks: BTreeMap<HummockEpoch, VecDeque<UploadingTaskId>>,
653    unsync_epochs: BTreeMap<HummockEpoch, ()>,
654    // Initialized to be `None`. Transform to `Some(_)` when called
655    // `local_seal_epoch` with a non-existing epoch, to mark that
656    // the fragment of the table has stopped.
657    stopped_next_epoch: Option<HummockEpoch>,
658    // newer epoch at the front
659    syncing_epochs: VecDeque<HummockEpoch>,
660    max_synced_epoch: Option<HummockEpoch>,
661}
662
663impl TableUnsyncData {
664    fn new(table_id: TableId, committed_epoch: Option<HummockEpoch>) -> Self {
665        Self {
666            table_id,
667            instance_data: Default::default(),
668            table_watermarks: None,
669            spill_tasks: Default::default(),
670            unsync_epochs: Default::default(),
671            stopped_next_epoch: None,
672            syncing_epochs: Default::default(),
673            max_synced_epoch: committed_epoch,
674        }
675    }
676
677    fn new_epoch(&mut self, epoch: HummockEpoch) {
678        debug!(table_id = ?self.table_id, epoch, "table new epoch");
679        if let Some(latest_epoch) = self.max_epoch() {
680            assert_gt!(epoch, latest_epoch);
681        }
682        self.unsync_epochs.insert(epoch, ());
683    }
684
685    #[expect(clippy::type_complexity)]
686    fn sync(
687        &mut self,
688        epoch: HummockEpoch,
689    ) -> (
690        impl Iterator<Item = (LocalInstanceId, Vec<UploaderImm>)> + '_,
691        Option<(
692            WatermarkDirection,
693            impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)> + use<>,
694            WatermarkSerdeType,
695        )>,
696        impl Iterator<Item = UploadingTaskId> + use<>,
697        BTreeMap<HummockEpoch, ()>,
698    ) {
699        if let Some(prev_epoch) = self.max_sync_epoch() {
700            assert_gt!(epoch, prev_epoch)
701        }
702        let epochs = take_before_epoch(&mut self.unsync_epochs, epoch);
703        assert_eq!(
704            *epochs.last_key_value().expect("non-empty").0,
705            epoch,
706            "{epochs:?} {epoch} {:?}",
707            self.table_id
708        );
709        self.syncing_epochs.push_front(epoch);
710        (
711            self.instance_data
712                .iter_mut()
713                .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))),
714            self.table_watermarks
715                .as_mut()
716                .map(|(direction, watermarks, watermark_type)| {
717                    let watermarks = take_before_epoch(watermarks, epoch)
718                        .into_iter()
719                        .map(|(epoch, (watermarks, _))| (epoch, watermarks));
720                    (*direction, watermarks, *watermark_type)
721                }),
722            take_before_epoch(&mut self.spill_tasks, epoch)
723                .into_values()
724                .flat_map(|tasks| tasks.into_iter()),
725            epochs,
726        )
727    }
728
729    fn ack_synced(&mut self, sync_epoch: HummockEpoch) {
730        let min_sync_epoch = self.syncing_epochs.pop_back().expect("should exist");
731        assert_eq!(sync_epoch, min_sync_epoch);
732        self.max_synced_epoch = Some(sync_epoch);
733    }
734
735    fn ack_committed(&mut self, committed_epoch: HummockEpoch) {
736        let synced_epoch_advanced = {
737            if let Some(max_synced_epoch) = self.max_synced_epoch
738                && max_synced_epoch >= committed_epoch
739            {
740                false
741            } else {
742                true
743            }
744        };
745        if synced_epoch_advanced {
746            self.max_synced_epoch = Some(committed_epoch);
747            if let Some(min_syncing_epoch) = self.syncing_epochs.back() {
748                assert_gt!(*min_syncing_epoch, committed_epoch);
749            }
750            self.assert_after_epoch(committed_epoch);
751        }
752    }
753
754    fn assert_after_epoch(&self, epoch: HummockEpoch) {
755        self.instance_data
756            .values()
757            .for_each(|instance_data| instance_data.assert_after_epoch(epoch));
758        if let Some((_, watermarks, _)) = &self.table_watermarks
759            && let Some((oldest_epoch, _)) = watermarks.first_key_value()
760        {
761            assert_gt!(*oldest_epoch, epoch);
762        }
763    }
764
765    fn max_sync_epoch(&self) -> Option<HummockEpoch> {
766        self.syncing_epochs
767            .front()
768            .cloned()
769            .or(self.max_synced_epoch)
770    }
771
772    fn max_epoch(&self) -> Option<HummockEpoch> {
773        self.unsync_epochs
774            .last_key_value()
775            .map(|(epoch, _)| *epoch)
776            .or_else(|| self.max_sync_epoch())
777    }
778
779    fn is_empty(&self) -> bool {
780        self.instance_data.is_empty()
781            && self.syncing_epochs.is_empty()
782            && self.unsync_epochs.is_empty()
783    }
784}
785
786#[derive(Eq, Hash, PartialEq, Copy, Clone)]
787struct UnsyncEpochId(HummockEpoch, TableId);
788
789impl UnsyncEpochId {
790    fn epoch(&self) -> HummockEpoch {
791        self.0
792    }
793}
794
795fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet<TableId>) -> Option<UnsyncEpochId> {
796    table_ids
797        .iter()
798        .min()
799        .map(|table_id| UnsyncEpochId(epoch, *table_id))
800}
801
802#[derive(Default)]
803/// Unsync data, can be either imm or spilled sst, and some aggregated epoch information.
804///
805/// `instance_data` holds the imm of each individual local instance, and data are first added here.
806/// The aggregated epoch information (table watermarks, etc.) and the spilled sst will be added to `epoch_data`.
807struct UnsyncData {
808    table_data: HashMap<TableId, TableUnsyncData>,
809    // An index as a mapping from instance id to its table id
810    instance_table_id: HashMap<LocalInstanceId, TableId>,
811    unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
812    spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
813}
814
815impl UnsyncData {
816    fn init_instance(
817        &mut self,
818        table_id: TableId,
819        instance_id: LocalInstanceId,
820        init_epoch: HummockEpoch,
821    ) {
822        debug!(
823            table_id = table_id.table_id,
824            instance_id, init_epoch, "init epoch"
825        );
826        let table_data = self
827            .table_data
828            .get_mut(&table_id)
829            .unwrap_or_else(|| panic!("should exist. {table_id:?}"));
830        assert!(
831            table_data
832                .instance_data
833                .insert(
834                    instance_id,
835                    LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch)
836                )
837                .is_none()
838        );
839        assert!(
840            self.instance_table_id
841                .insert(instance_id, table_id)
842                .is_none()
843        );
844        assert!(table_data.unsync_epochs.contains_key(&init_epoch));
845    }
846
847    fn instance_data(
848        &mut self,
849        instance_id: LocalInstanceId,
850    ) -> Option<&mut LocalInstanceUnsyncData> {
851        self.instance_table_id
852            .get_mut(&instance_id)
853            .cloned()
854            .map(move |table_id| {
855                self.table_data
856                    .get_mut(&table_id)
857                    .expect("should exist")
858                    .instance_data
859                    .get_mut(&instance_id)
860                    .expect("should exist")
861            })
862    }
863
864    fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) {
865        self.instance_data(instance_id)
866            .expect("should exist")
867            .add_imm(imm);
868    }
869
870    fn local_seal_epoch(
871        &mut self,
872        instance_id: LocalInstanceId,
873        next_epoch: HummockEpoch,
874        opts: SealCurrentEpochOptions,
875    ) {
876        let table_id = self.instance_table_id[&instance_id];
877        let table_data = self.table_data.get_mut(&table_id).expect("should exist");
878        let instance_data = table_data
879            .instance_data
880            .get_mut(&instance_id)
881            .expect("should exist");
882        let epoch = instance_data.local_seal_epoch(next_epoch);
883        // When drop/cancel a streaming job, for the barrier to stop actor, the
884        // local instance will call `local_seal_epoch`, but the `next_epoch` won't be
885        // called `start_epoch` because we have stopped writing on it.
886        if !table_data.unsync_epochs.contains_key(&next_epoch) {
887            if let Some(stopped_next_epoch) = table_data.stopped_next_epoch {
888                if stopped_next_epoch != next_epoch {
889                    let table_id = table_data.table_id.table_id;
890                    let unsync_epochs = table_data.unsync_epochs.keys().collect_vec();
891                    if cfg!(debug_assertions) {
892                        panic!(
893                            "table_id {} stop epoch {} different to prev stop epoch {}. unsync epochs: {:?}, syncing epochs {:?}, max_synced_epoch {:?}",
894                            table_id,
895                            next_epoch,
896                            stopped_next_epoch,
897                            unsync_epochs,
898                            table_data.syncing_epochs,
899                            table_data.max_synced_epoch
900                        );
901                    } else {
902                        warn!(
903                            table_id,
904                            stopped_next_epoch,
905                            next_epoch,
906                            ?unsync_epochs,
907                            syncing_epochs = ?table_data.syncing_epochs,
908                            max_synced_epoch = ?table_data.max_synced_epoch,
909                            "different stop epoch"
910                        );
911                    }
912                }
913            } else {
914                if let Some(max_epoch) = table_data.max_epoch() {
915                    assert_gt!(next_epoch, max_epoch);
916                }
917                debug!(?table_id, epoch, next_epoch, "table data has stopped");
918                table_data.stopped_next_epoch = Some(next_epoch);
919            }
920        }
921        if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks {
922            table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type);
923        }
924    }
925
926    fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
927        if let Some(table_id) = self.instance_table_id.get(&instance_id) {
928            debug!(instance_id, "destroy instance");
929            let table_data = self.table_data.get_mut(table_id).expect("should exist");
930            let instance_data = table_data
931                .instance_data
932                .get_mut(&instance_id)
933                .expect("should exist");
934            assert!(
935                !instance_data.is_destroyed,
936                "cannot destroy an instance for twice"
937            );
938            instance_data.is_destroyed = true;
939        }
940    }
941
942    fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
943        for table_id in table_ids {
944            if let Some(table_unsync_data) = self.table_data.remove(table_id) {
945                for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
946                    if let Some(task_status) = task_manager.abort_task(task_id) {
947                        must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
948                            assert!(spill_table_ids.is_subset(table_ids));
949                        });
950                    }
951                    if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
952                        assert!(spill_table_ids.is_subset(table_ids));
953                    }
954                }
955                assert!(
956                    table_unsync_data
957                        .instance_data
958                        .values()
959                        .all(|instance| instance.is_destroyed),
960                    "should be clear when dropping the read version instance"
961                );
962                for instance_id in table_unsync_data.instance_data.keys() {
963                    assert_eq!(
964                        *table_id,
965                        self.instance_table_id
966                            .remove(instance_id)
967                            .expect("should exist")
968                    );
969                }
970            }
971        }
972        debug_assert!(
973            self.spilled_data
974                .values()
975                .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))
976        );
977        self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
978            if !unsync_epoch_table_ids.is_disjoint(table_ids) {
979                assert!(unsync_epoch_table_ids.is_subset(table_ids));
980                false
981            } else {
982                true
983            }
984        });
985        assert!(
986            self.instance_table_id
987                .values()
988                .all(|table_id| !table_ids.contains(table_id))
989        );
990    }
991}
992
993impl UploaderData {
994    fn sync(
995        &mut self,
996        context: &UploaderContext,
997        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
998        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
999    ) {
1000        let mut all_table_watermarks = HashMap::new();
1001        let mut uploading_tasks = HashSet::new();
1002        let mut spilled_tasks = BTreeSet::new();
1003        let mut all_table_ids = HashSet::new();
1004
1005        let mut flush_payload = HashMap::new();
1006
1007        for (epoch, table_ids) in &sync_table_epochs {
1008            let epoch = *epoch;
1009            for table_id in table_ids {
1010                assert!(
1011                    all_table_ids.insert(*table_id),
1012                    "duplicate sync table epoch: {:?} {:?}",
1013                    all_table_ids,
1014                    sync_table_epochs
1015                );
1016            }
1017            if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) {
1018                let min_table_id_data = self
1019                    .unsync_data
1020                    .table_data
1021                    .get_mut(&min_table_id)
1022                    .expect("should exist");
1023                let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch);
1024                for epoch in epochs.keys() {
1025                    assert_eq!(
1026                        &self
1027                            .unsync_data
1028                            .unsync_epochs
1029                            .remove(&UnsyncEpochId(*epoch, min_table_id))
1030                            .expect("should exist"),
1031                        table_ids
1032                    );
1033                }
1034                for table_id in table_ids {
1035                    let table_data = self
1036                        .unsync_data
1037                        .table_data
1038                        .get_mut(table_id)
1039                        .expect("should exist");
1040                    let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) =
1041                        table_data.sync(epoch);
1042                    assert_eq!(table_unsync_epochs, epochs);
1043                    for (instance_id, payload) in unflushed_payload {
1044                        if !payload.is_empty() {
1045                            flush_payload.insert(instance_id, payload);
1046                        }
1047                    }
1048                    table_data.instance_data.retain(|instance_id, data| {
1049                        // remove the finished instances
1050                        if data.is_finished() {
1051                            assert_eq!(
1052                                self.unsync_data.instance_table_id.remove(instance_id),
1053                                Some(*table_id)
1054                            );
1055                            false
1056                        } else {
1057                            true
1058                        }
1059                    });
1060                    if let Some((direction, watermarks, watermark_type)) = table_watermarks {
1061                        Self::add_table_watermarks(
1062                            &mut all_table_watermarks,
1063                            *table_id,
1064                            direction,
1065                            watermarks,
1066                            watermark_type,
1067                        );
1068                    }
1069                    for task_id in task_ids {
1070                        if self.unsync_data.spilled_data.contains_key(&task_id) {
1071                            spilled_tasks.insert(task_id);
1072                        } else {
1073                            uploading_tasks.insert(task_id);
1074                        }
1075                    }
1076                }
1077            }
1078        }
1079
1080        let sync_id = {
1081            let sync_id = self.next_sync_id;
1082            self.next_sync_id += 1;
1083            SyncId(sync_id)
1084        };
1085
1086        if let Some(extra_flush_task_id) = self.task_manager.sync(
1087            context,
1088            sync_id,
1089            flush_payload,
1090            uploading_tasks.iter().cloned(),
1091            &all_table_ids,
1092        ) {
1093            uploading_tasks.insert(extra_flush_task_id);
1094        }
1095
1096        // iter from large task_id to small one so that newer data at the front
1097        let uploaded = spilled_tasks
1098            .iter()
1099            .rev()
1100            .map(|task_id| {
1101                let (sst, spill_table_ids) = self
1102                    .unsync_data
1103                    .spilled_data
1104                    .remove(task_id)
1105                    .expect("should exist");
1106                assert!(
1107                    spill_table_ids.is_subset(&all_table_ids),
1108                    "spilled tabled ids {:?} not a subset of sync table id {:?}",
1109                    spill_table_ids,
1110                    all_table_ids
1111                );
1112                sst
1113            })
1114            .collect();
1115
1116        self.syncing_data.insert(
1117            sync_id,
1118            SyncingData {
1119                sync_table_epochs,
1120                remaining_uploading_tasks: uploading_tasks,
1121                uploaded,
1122                table_watermarks: all_table_watermarks,
1123                sync_result_sender,
1124            },
1125        );
1126
1127        self.check_upload_task_consistency();
1128    }
1129}
1130
1131impl UnsyncData {
1132    fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
1133        for (instance_id, imm_ids) in sstable_info.imm_ids() {
1134            if let Some(instance_data) = self.instance_data(*instance_id) {
1135                // take `rev` to let old imm id goes first
1136                instance_data.ack_flushed(imm_ids.iter().rev().cloned());
1137            }
1138        }
1139    }
1140}
1141
1142struct SyncingData {
1143    sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1144    remaining_uploading_tasks: HashSet<UploadingTaskId>,
1145    // newer data at the front
1146    uploaded: VecDeque<Arc<StagingSstableInfo>>,
1147    table_watermarks: HashMap<TableId, TableWatermarks>,
1148    sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1149}
1150
1151#[derive(Debug)]
1152pub struct SyncedData {
1153    pub uploaded_ssts: VecDeque<Arc<StagingSstableInfo>>,
1154    pub table_watermarks: HashMap<TableId, TableWatermarks>,
1155}
1156
1157struct UploaderContext {
1158    pinned_version: PinnedVersion,
1159    /// When called, it will spawn a task to flush the imm into sst and return the join handle.
1160    spawn_upload_task: SpawnUploadTask,
1161    buffer_tracker: BufferTracker,
1162
1163    stats: Arc<HummockStateStoreMetrics>,
1164}
1165
1166impl UploaderContext {
1167    fn new(
1168        pinned_version: PinnedVersion,
1169        spawn_upload_task: SpawnUploadTask,
1170        buffer_tracker: BufferTracker,
1171        stats: Arc<HummockStateStoreMetrics>,
1172    ) -> Self {
1173        UploaderContext {
1174            pinned_version,
1175            spawn_upload_task,
1176            buffer_tracker,
1177            stats,
1178        }
1179    }
1180}
1181
1182#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
1183struct SyncId(usize);
1184
1185#[derive(Default)]
1186struct UploaderData {
1187    unsync_data: UnsyncData,
1188
1189    syncing_data: BTreeMap<SyncId, SyncingData>,
1190
1191    task_manager: TaskManager,
1192    next_sync_id: usize,
1193}
1194
1195impl UploaderData {
1196    fn abort(self, err: impl Fn() -> HummockError) {
1197        self.task_manager.abort_all_tasks();
1198        for syncing_data in self.syncing_data.into_values() {
1199            send_sync_result(syncing_data.sync_result_sender, Err(err()));
1200        }
1201    }
1202
1203    fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
1204        if table_ids.is_empty() {
1205            return;
1206        }
1207        self.unsync_data
1208            .clear_tables(&table_ids, &mut self.task_manager);
1209        self.syncing_data.retain(|sync_id, syncing_data| {
1210            if syncing_data
1211                .sync_table_epochs
1212                .iter()
1213                .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids))
1214            {
1215                assert!(
1216                    syncing_data
1217                        .sync_table_epochs
1218                        .iter()
1219                        .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))
1220                );
1221                for task_id in &syncing_data.remaining_uploading_tasks {
1222                    match self
1223                        .task_manager
1224                        .abort_task(*task_id)
1225                        .expect("should exist")
1226                    {
1227                        UploadingTaskStatus::Spilling(spill_table_ids) => {
1228                            assert!(spill_table_ids.is_subset(&table_ids));
1229                        }
1230                        UploadingTaskStatus::Sync(task_sync_id) => {
1231                            assert_eq!(sync_id, &task_sync_id);
1232                        }
1233                    }
1234                }
1235                false
1236            } else {
1237                true
1238            }
1239        });
1240
1241        self.check_upload_task_consistency();
1242    }
1243
1244    fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1245        self.unsync_data
1246            .spilled_data
1247            .values()
1248            .map(|(s, _)| s)
1249            .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
1250            .filter_map(|s| {
1251                s.sstable_infos()
1252                    .iter()
1253                    .chain(s.old_value_sstable_infos())
1254                    .map(|s| s.sst_info.object_id)
1255                    .min()
1256            })
1257            .min()
1258            .map(|object_id| object_id.as_raw())
1259    }
1260}
1261
1262struct ErrState {
1263    failed_sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1264    reason: String,
1265}
1266
1267enum UploaderState {
1268    Working(UploaderData),
1269    Err(ErrState),
1270}
1271
1272/// An uploader for hummock data.
1273///
1274/// Data have 3 sequential stages: unsync (inside each local instance, data can be unsealed, sealed), syncing, synced.
1275///
1276/// The 3 stages are divided by 2 marginal epochs: `max_syncing_epoch`,
1277/// `max_synced_epoch` in each `TableUnSyncData`. Epochs satisfy the following inequality.
1278///
1279/// (epochs of `synced_data`) <= `max_synced_epoch` < (epochs of `syncing_data`) <=
1280/// `max_syncing_epoch` < (epochs of `unsync_data`)
1281///
1282/// Data are mostly stored in `VecDeque`, and the order stored in the `VecDeque` indicates the data
1283/// order. Data at the front represents ***newer*** data.
1284pub struct HummockUploader {
1285    state: UploaderState,
1286
1287    context: UploaderContext,
1288}
1289
1290impl HummockUploader {
1291    pub(super) fn new(
1292        state_store_metrics: Arc<HummockStateStoreMetrics>,
1293        pinned_version: PinnedVersion,
1294        spawn_upload_task: SpawnUploadTask,
1295        buffer_tracker: BufferTracker,
1296    ) -> Self {
1297        Self {
1298            state: UploaderState::Working(UploaderData::default()),
1299            context: UploaderContext::new(
1300                pinned_version,
1301                spawn_upload_task,
1302                buffer_tracker,
1303                state_store_metrics,
1304            ),
1305        }
1306    }
1307
1308    pub(super) fn buffer_tracker(&self) -> &BufferTracker {
1309        &self.context.buffer_tracker
1310    }
1311
1312    pub(super) fn hummock_version(&self) -> &PinnedVersion {
1313        &self.context.pinned_version
1314    }
1315
1316    pub(super) fn add_imm(&mut self, instance_id: LocalInstanceId, imm: ImmutableMemtable) {
1317        let UploaderState::Working(data) = &mut self.state else {
1318            return;
1319        };
1320        let imm = UploaderImm::new(imm, &self.context);
1321        data.unsync_data.add_imm(instance_id, imm);
1322    }
1323
1324    pub(super) fn init_instance(
1325        &mut self,
1326        instance_id: LocalInstanceId,
1327        table_id: TableId,
1328        init_epoch: HummockEpoch,
1329    ) {
1330        let UploaderState::Working(data) = &mut self.state else {
1331            return;
1332        };
1333        data.unsync_data
1334            .init_instance(table_id, instance_id, init_epoch);
1335    }
1336
1337    pub(super) fn local_seal_epoch(
1338        &mut self,
1339        instance_id: LocalInstanceId,
1340        next_epoch: HummockEpoch,
1341        opts: SealCurrentEpochOptions,
1342    ) {
1343        let UploaderState::Working(data) = &mut self.state else {
1344            return;
1345        };
1346        data.unsync_data
1347            .local_seal_epoch(instance_id, next_epoch, opts);
1348    }
1349
1350    pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
1351        let UploaderState::Working(data) = &mut self.state else {
1352            return;
1353        };
1354        debug!(epoch, ?table_ids, "start epoch");
1355        for table_id in &table_ids {
1356            let table_data = data
1357                .unsync_data
1358                .table_data
1359                .entry(*table_id)
1360                .or_insert_with(|| {
1361                    TableUnsyncData::new(
1362                        *table_id,
1363                        self.context.pinned_version.table_committed_epoch(*table_id),
1364                    )
1365                });
1366            table_data.new_epoch(epoch);
1367        }
1368        if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) {
1369            assert!(
1370                data.unsync_data
1371                    .unsync_epochs
1372                    .insert(unsync_epoch_id, table_ids)
1373                    .is_none()
1374            );
1375        }
1376    }
1377
1378    pub(super) fn start_sync_epoch(
1379        &mut self,
1380        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1381        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1382    ) {
1383        let data = match &mut self.state {
1384            UploaderState::Working(data) => data,
1385            UploaderState::Err(ErrState {
1386                failed_sync_table_epochs,
1387                reason,
1388            }) => {
1389                let result = Err(HummockError::other(format!(
1390                    "previous sync epoch {:?} failed due to [{}]",
1391                    failed_sync_table_epochs, reason
1392                )));
1393                send_sync_result(sync_result_sender, result);
1394                return;
1395            }
1396        };
1397        debug!(?sync_table_epochs, "start sync epoch");
1398
1399        data.sync(&self.context, sync_result_sender, sync_table_epochs);
1400
1401        data.may_notify_sync_task(&self.context);
1402
1403        self.context
1404            .stats
1405            .uploader_syncing_epoch_count
1406            .set(data.syncing_data.len() as _);
1407    }
1408
1409    pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) {
1410        if let UploaderState::Working(data) = &mut self.state {
1411            // TODO: may only `ack_committed` on table whose `committed_epoch` is changed.
1412            for (table_id, info) in pinned_version.state_table_info.info() {
1413                if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) {
1414                    table_data.ack_committed(info.committed_epoch);
1415                }
1416            }
1417        }
1418        self.context.pinned_version = pinned_version;
1419    }
1420
1421    pub(crate) fn may_flush(&mut self) -> bool {
1422        let UploaderState::Working(data) = &mut self.state else {
1423            return false;
1424        };
1425        if self.context.buffer_tracker.need_flush() {
1426            let mut spiller = Spiller::new(&mut data.unsync_data);
1427            let mut curr_batch_flush_size = 0;
1428            // iterate from older epoch to newer epoch
1429            while self
1430                .context
1431                .buffer_tracker
1432                .need_more_flush(curr_batch_flush_size)
1433                && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload()
1434            {
1435                assert!(!payload.is_empty());
1436                {
1437                    let (task_id, task_size, spilled_table_ids) =
1438                        data.task_manager
1439                            .spill(&self.context, spilled_table_ids, payload);
1440                    for table_id in spilled_table_ids {
1441                        spiller
1442                            .unsync_data()
1443                            .table_data
1444                            .get_mut(table_id)
1445                            .expect("should exist")
1446                            .spill_tasks
1447                            .entry(epoch)
1448                            .or_default()
1449                            .push_front(task_id);
1450                    }
1451                    curr_batch_flush_size += task_size;
1452                }
1453            }
1454            data.check_upload_task_consistency();
1455            curr_batch_flush_size > 0
1456        } else {
1457            false
1458        }
1459    }
1460
1461    pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
1462        if let Some(table_ids) = table_ids {
1463            if let UploaderState::Working(data) = &mut self.state {
1464                data.clear_tables(table_ids);
1465            }
1466        } else {
1467            if let UploaderState::Working(data) = replace(
1468                &mut self.state,
1469                UploaderState::Working(UploaderData::default()),
1470            ) {
1471                data.abort(|| HummockError::other("uploader is reset"));
1472            }
1473
1474            self.context.stats.uploader_syncing_epoch_count.set(0);
1475        }
1476    }
1477
1478    pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
1479        let UploaderState::Working(data) = &mut self.state else {
1480            return;
1481        };
1482        data.unsync_data.may_destroy_instance(instance_id);
1483    }
1484
1485    pub(crate) fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1486        if let UploaderState::Working(ref u) = self.state {
1487            u.min_uncommitted_object_id()
1488        } else {
1489            None
1490        }
1491    }
1492}
1493
1494impl UploaderData {
1495    fn may_notify_sync_task(&mut self, context: &UploaderContext) {
1496        while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
1497            && syncing_data.remaining_uploading_tasks.is_empty()
1498        {
1499            let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty");
1500            let SyncingData {
1501                sync_table_epochs,
1502                remaining_uploading_tasks: _,
1503                uploaded,
1504                table_watermarks,
1505                sync_result_sender,
1506            } = syncing_data;
1507            context
1508                .stats
1509                .uploader_syncing_epoch_count
1510                .set(self.syncing_data.len() as _);
1511
1512            for (sync_epoch, table_ids) in sync_table_epochs {
1513                for table_id in table_ids {
1514                    if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) {
1515                        table_data.ack_synced(sync_epoch);
1516                        if table_data.is_empty() {
1517                            self.unsync_data.table_data.remove(&table_id);
1518                        }
1519                    }
1520                }
1521            }
1522
1523            send_sync_result(
1524                sync_result_sender,
1525                Ok(SyncedData {
1526                    uploaded_ssts: uploaded,
1527                    table_watermarks,
1528                }),
1529            )
1530        }
1531    }
1532
1533    fn check_upload_task_consistency(&self) {
1534        #[cfg(debug_assertions)]
1535        {
1536            let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new();
1537            for table_data in self.unsync_data.table_data.values() {
1538                for task_id in table_data
1539                    .spill_tasks
1540                    .iter()
1541                    .flat_map(|(_, tasks)| tasks.iter())
1542                {
1543                    assert!(
1544                        spill_task_table_id_from_data
1545                            .entry(*task_id)
1546                            .or_default()
1547                            .insert(table_data.table_id)
1548                    );
1549                }
1550            }
1551            let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self
1552                .syncing_data
1553                .iter()
1554                .filter_map(|(sync_id, data)| {
1555                    if data.remaining_uploading_tasks.is_empty() {
1556                        None
1557                    } else {
1558                        Some((*sync_id, data.remaining_uploading_tasks.clone()))
1559                    }
1560                })
1561                .collect();
1562
1563            let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1564            for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
1565                spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
1566            }
1567            let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1568            for (task_id, status) in self.task_manager.tasks() {
1569                match status {
1570                    UploadingTaskStatus::Spilling(table_ids) => {
1571                        assert!(
1572                            spill_task_table_id_from_manager
1573                                .insert(task_id, table_ids.clone())
1574                                .is_none()
1575                        );
1576                    }
1577                    UploadingTaskStatus::Sync(sync_id) => {
1578                        assert!(
1579                            syncing_task_from_manager
1580                                .entry(*sync_id)
1581                                .or_default()
1582                                .insert(task_id)
1583                        );
1584                    }
1585                }
1586            }
1587            assert_eq!(
1588                spill_task_table_id_from_data,
1589                spill_task_table_id_from_manager
1590            );
1591            assert_eq!(syncing_task_id_from_data, syncing_task_from_manager);
1592        }
1593    }
1594}
1595
1596impl HummockUploader {
1597    pub(super) fn next_uploaded_sst(
1598        &mut self,
1599    ) -> impl Future<Output = Arc<StagingSstableInfo>> + '_ {
1600        poll_fn(|cx| {
1601            let UploaderState::Working(data) = &mut self.state else {
1602                return Poll::Pending;
1603            };
1604
1605            if let Some((task_id, status, result)) = ready!(data.task_manager.poll_task_result(cx))
1606            {
1607                match result {
1608                    Ok(sst) => {
1609                        data.unsync_data.ack_flushed(&sst);
1610                        match status {
1611                            UploadingTaskStatus::Sync(sync_id) => {
1612                                let syncing_data =
1613                                    data.syncing_data.get_mut(&sync_id).expect("should exist");
1614                                syncing_data.uploaded.push_front(sst.clone());
1615                                assert!(syncing_data.remaining_uploading_tasks.remove(&task_id));
1616                                data.may_notify_sync_task(&self.context);
1617                            }
1618                            UploadingTaskStatus::Spilling(table_ids) => {
1619                                data.unsync_data
1620                                    .spilled_data
1621                                    .insert(task_id, (sst.clone(), table_ids));
1622                            }
1623                        }
1624                        data.check_upload_task_consistency();
1625                        Poll::Ready(sst)
1626                    }
1627                    Err((sync_id, e)) => {
1628                        let syncing_data =
1629                            data.syncing_data.remove(&sync_id).expect("should exist");
1630                        let failed_epochs = syncing_data.sync_table_epochs.clone();
1631                        let data = must_match!(replace(
1632                            &mut self.state,
1633                            UploaderState::Err(ErrState {
1634                                failed_sync_table_epochs: syncing_data.sync_table_epochs,
1635                                reason: e.as_report().to_string(),
1636                            }),
1637                        ), UploaderState::Working(data) => data);
1638
1639                        let _ = syncing_data
1640                            .sync_result_sender
1641                            .send(Err(HummockError::other(format!(
1642                                "failed to sync: {:?}",
1643                                e.as_report()
1644                            ))));
1645
1646                        data.abort(|| {
1647                            HummockError::other(format!(
1648                                "previous epoch {:?} failed to sync",
1649                                failed_epochs
1650                            ))
1651                        });
1652                        Poll::Pending
1653                    }
1654                }
1655            } else {
1656                Poll::Pending
1657            }
1658        })
1659    }
1660}
1661
1662#[cfg(test)]
1663pub(crate) mod tests {
1664    use std::collections::{HashMap, HashSet};
1665    use std::future::{Future, poll_fn};
1666    use std::ops::Deref;
1667    use std::pin::pin;
1668    use std::sync::Arc;
1669    use std::sync::atomic::AtomicUsize;
1670    use std::sync::atomic::Ordering::SeqCst;
1671    use std::task::Poll;
1672
1673    use futures::FutureExt;
1674    use risingwave_common::catalog::TableId;
1675    use risingwave_common::util::epoch::EpochExt;
1676    use risingwave_hummock_sdk::HummockEpoch;
1677    use tokio::sync::oneshot;
1678
1679    use super::test_utils::*;
1680    use crate::hummock::event_handler::TEST_LOCAL_INSTANCE_ID;
1681    use crate::hummock::event_handler::uploader::{
1682        HummockUploader, SyncedData, UploadingTask, get_payload_imm_ids,
1683    };
1684    use crate::hummock::{HummockError, HummockResult};
1685    use crate::opts::StorageOpts;
1686
1687    impl HummockUploader {
1688        pub(super) fn start_single_epoch_sync(
1689            &mut self,
1690            epoch: HummockEpoch,
1691            sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1692            table_ids: HashSet<TableId>,
1693        ) {
1694            self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]);
1695        }
1696    }
1697
1698    #[tokio::test]
1699    pub async fn test_uploading_task_future() {
1700        let uploader_context = test_uploader_context(dummy_success_upload_future);
1701
1702        let imm = gen_imm(INITIAL_EPOCH).await;
1703        let imm_size = imm.size();
1704        let imm_ids = get_imm_ids(vec![&imm]);
1705        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1706        assert_eq!(imm_size, task.task_info.task_size);
1707        assert_eq!(imm_ids, task.task_info.imm_ids);
1708        assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs);
1709        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1710        assert_eq!(
1711            output.sstable_infos(),
1712            &dummy_success_upload_output().new_value_ssts
1713        );
1714        assert_eq!(imm_size, output.imm_size());
1715        assert_eq!(&imm_ids, output.imm_ids());
1716        assert_eq!(&vec![INITIAL_EPOCH], output.epochs());
1717
1718        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1719        let imm = gen_imm(INITIAL_EPOCH).await;
1720        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1721        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1722    }
1723
1724    #[tokio::test]
1725    pub async fn test_uploading_task_poll_result() {
1726        let uploader_context = test_uploader_context(dummy_success_upload_future);
1727        let mut task =
1728            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1729        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1730        assert_eq!(
1731            output.sstable_infos(),
1732            &dummy_success_upload_output().new_value_ssts
1733        );
1734
1735        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1736        let mut task =
1737            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1738        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1739    }
1740
1741    #[tokio::test]
1742    async fn test_uploading_task_poll_ok_with_retry() {
1743        let run_count = Arc::new(AtomicUsize::new(0));
1744        let fail_num = 10;
1745        let run_count_clone = run_count.clone();
1746        let uploader_context = test_uploader_context(move |payload, info| {
1747            let run_count = run_count.clone();
1748            async move {
1749                // fail in the first `fail_num` run, and success at the end
1750                let ret = if run_count.load(SeqCst) < fail_num {
1751                    Err(HummockError::other("fail"))
1752                } else {
1753                    dummy_success_upload_future(payload, info).await
1754                };
1755                run_count.fetch_add(1, SeqCst);
1756                ret
1757            }
1758        });
1759        let mut task =
1760            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1761        let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await;
1762        assert_eq!(fail_num + 1, run_count_clone.load(SeqCst));
1763        assert_eq!(
1764            output.sstable_infos(),
1765            &dummy_success_upload_output().new_value_ssts
1766        );
1767    }
1768
1769    #[tokio::test]
1770    async fn test_uploader_basic() {
1771        let mut uploader = test_uploader(dummy_success_upload_future);
1772        let epoch1 = INITIAL_EPOCH.next_epoch();
1773        uploader.start_epochs_for_test([epoch1]);
1774        let imm = gen_imm(epoch1).await;
1775        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1776        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1777        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1778
1779        let (sync_tx, sync_rx) = oneshot::channel();
1780        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1781        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1782        assert_eq!(1, uploader.data().syncing_data.len());
1783        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1784        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1785        assert!(syncing_data.uploaded.is_empty());
1786        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1787
1788        let staging_sst = uploader.next_uploaded_sst().await;
1789        assert_eq!(&vec![epoch1], staging_sst.epochs());
1790        assert_eq!(
1791            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1792            staging_sst.imm_ids()
1793        );
1794        assert_eq!(
1795            &dummy_success_upload_output().new_value_ssts,
1796            staging_sst.sstable_infos()
1797        );
1798
1799        match sync_rx.await {
1800            Ok(Ok(data)) => {
1801                let SyncedData {
1802                    uploaded_ssts,
1803                    table_watermarks,
1804                } = data;
1805                assert_eq!(1, uploaded_ssts.len());
1806                let staging_sst = &uploaded_ssts[0];
1807                assert_eq!(&vec![epoch1], staging_sst.epochs());
1808                assert_eq!(
1809                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1810                    staging_sst.imm_ids()
1811                );
1812                assert_eq!(
1813                    &dummy_success_upload_output().new_value_ssts,
1814                    staging_sst.sstable_infos()
1815                );
1816                assert!(table_watermarks.is_empty());
1817            }
1818            _ => unreachable!(),
1819        };
1820        assert_eq!(epoch1, uploader.test_max_synced_epoch());
1821
1822        let new_pinned_version = uploader
1823            .context
1824            .pinned_version
1825            .new_pin_version(test_hummock_version(epoch1))
1826            .unwrap();
1827        uploader.update_pinned_version(new_pinned_version);
1828        assert_eq!(
1829            epoch1,
1830            uploader
1831                .context
1832                .pinned_version
1833                .table_committed_epoch(TEST_TABLE_ID)
1834                .unwrap()
1835        );
1836    }
1837
1838    #[tokio::test]
1839    async fn test_uploader_destroy_instance_before_sync() {
1840        let mut uploader = test_uploader(dummy_success_upload_future);
1841        let epoch1 = INITIAL_EPOCH.next_epoch();
1842        uploader.start_epochs_for_test([epoch1]);
1843        let imm = gen_imm(epoch1).await;
1844        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1845        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1846        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1847        uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);
1848
1849        let (sync_tx, sync_rx) = oneshot::channel();
1850        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1851        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1852        assert_eq!(1, uploader.data().syncing_data.len());
1853        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1854        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1855        assert!(syncing_data.uploaded.is_empty());
1856        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1857
1858        let staging_sst = uploader.next_uploaded_sst().await;
1859        assert_eq!(&vec![epoch1], staging_sst.epochs());
1860        assert_eq!(
1861            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1862            staging_sst.imm_ids()
1863        );
1864        assert_eq!(
1865            &dummy_success_upload_output().new_value_ssts,
1866            staging_sst.sstable_infos()
1867        );
1868
1869        match sync_rx.await {
1870            Ok(Ok(data)) => {
1871                let SyncedData {
1872                    uploaded_ssts,
1873                    table_watermarks,
1874                } = data;
1875                assert_eq!(1, uploaded_ssts.len());
1876                let staging_sst = &uploaded_ssts[0];
1877                assert_eq!(&vec![epoch1], staging_sst.epochs());
1878                assert_eq!(
1879                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1880                    staging_sst.imm_ids()
1881                );
1882                assert_eq!(
1883                    &dummy_success_upload_output().new_value_ssts,
1884                    staging_sst.sstable_infos()
1885                );
1886                assert!(table_watermarks.is_empty());
1887            }
1888            _ => unreachable!(),
1889        };
1890        assert!(
1891            !uploader
1892                .data()
1893                .unsync_data
1894                .table_data
1895                .contains_key(&TEST_TABLE_ID)
1896        );
1897    }
1898
1899    #[tokio::test]
1900    async fn test_empty_uploader_sync() {
1901        let mut uploader = test_uploader(dummy_success_upload_future);
1902        let epoch1 = INITIAL_EPOCH.next_epoch();
1903
1904        let (sync_tx, sync_rx) = oneshot::channel();
1905        uploader.start_epochs_for_test([epoch1]);
1906        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1907        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1908        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1909        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
1910
1911        assert_uploader_pending(&mut uploader).await;
1912
1913        match sync_rx.await {
1914            Ok(Ok(data)) => {
1915                assert!(data.uploaded_ssts.is_empty());
1916            }
1917            _ => unreachable!(),
1918        };
1919        assert_eq!(epoch1, uploader.test_max_synced_epoch());
1920        let new_pinned_version = uploader
1921            .context
1922            .pinned_version
1923            .new_pin_version(test_hummock_version(epoch1))
1924            .unwrap();
1925        uploader.update_pinned_version(new_pinned_version);
1926        assert!(uploader.data().syncing_data.is_empty());
1927        assert_eq!(
1928            epoch1,
1929            uploader
1930                .context
1931                .pinned_version
1932                .table_committed_epoch(TEST_TABLE_ID)
1933                .unwrap()
1934        );
1935    }
1936
1937    #[tokio::test]
1938    async fn test_uploader_empty_epoch() {
1939        let mut uploader = test_uploader(dummy_success_upload_future);
1940        let epoch1 = INITIAL_EPOCH.next_epoch();
1941        let epoch2 = epoch1.next_epoch();
1942        uploader.start_epochs_for_test([epoch1, epoch2]);
1943        let imm = gen_imm(epoch2).await;
1944        // epoch1 is empty while epoch2 is not. Going to seal empty epoch1.
1945        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1946        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1947        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm);
1948
1949        let (sync_tx, sync_rx) = oneshot::channel();
1950        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1951        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
1952
1953        assert_uploader_pending(&mut uploader).await;
1954
1955        match sync_rx.await {
1956            Ok(Ok(data)) => {
1957                assert!(data.uploaded_ssts.is_empty());
1958            }
1959            _ => unreachable!(),
1960        };
1961        assert_eq!(epoch1, uploader.test_max_synced_epoch());
1962        let new_pinned_version = uploader
1963            .context
1964            .pinned_version
1965            .new_pin_version(test_hummock_version(epoch1))
1966            .unwrap();
1967        uploader.update_pinned_version(new_pinned_version);
1968        assert!(uploader.data().syncing_data.is_empty());
1969        assert_eq!(
1970            epoch1,
1971            uploader
1972                .context
1973                .pinned_version
1974                .table_committed_epoch(TEST_TABLE_ID)
1975                .unwrap()
1976        );
1977    }
1978
1979    #[tokio::test]
1980    async fn test_uploader_poll_empty() {
1981        let mut uploader = test_uploader(dummy_success_upload_future);
1982        let fut = uploader.next_uploaded_sst();
1983        let mut fut = pin!(fut);
1984        assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await);
1985    }
1986
1987    #[tokio::test]
1988    async fn test_uploader_empty_advance_mce() {
1989        let mut uploader = test_uploader(dummy_success_upload_future);
1990        let initial_pinned_version = uploader.context.pinned_version.clone();
1991        let epoch1 = INITIAL_EPOCH.next_epoch();
1992        let epoch2 = epoch1.next_epoch();
1993        let epoch3 = epoch2.next_epoch();
1994        let epoch4 = epoch3.next_epoch();
1995        let epoch5 = epoch4.next_epoch();
1996        let epoch6 = epoch5.next_epoch();
1997        let version1 = initial_pinned_version
1998            .new_pin_version(test_hummock_version(epoch1))
1999            .unwrap();
2000        let version2 = initial_pinned_version
2001            .new_pin_version(test_hummock_version(epoch2))
2002            .unwrap();
2003        let version3 = initial_pinned_version
2004            .new_pin_version(test_hummock_version(epoch3))
2005            .unwrap();
2006        let version4 = initial_pinned_version
2007            .new_pin_version(test_hummock_version(epoch4))
2008            .unwrap();
2009        let version5 = initial_pinned_version
2010            .new_pin_version(test_hummock_version(epoch5))
2011            .unwrap();
2012
2013        uploader.start_epochs_for_test([epoch6]);
2014        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
2015
2016        uploader.update_pinned_version(version1);
2017        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2018        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2019
2020        let imm = gen_imm(epoch6).await;
2021        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
2022        uploader.update_pinned_version(version2);
2023        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2024        assert_eq!(epoch2, uploader.test_max_syncing_epoch());
2025
2026        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6);
2027        uploader.update_pinned_version(version3);
2028        assert_eq!(epoch3, uploader.test_max_synced_epoch());
2029        assert_eq!(epoch3, uploader.test_max_syncing_epoch());
2030
2031        let (sync_tx, sync_rx) = oneshot::channel();
2032        uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2033        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2034        uploader.update_pinned_version(version4);
2035        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2036        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2037
2038        let sst = uploader.next_uploaded_sst().await;
2039        assert_eq!(&get_imm_ids([&imm]), sst.imm_ids());
2040
2041        match sync_rx.await {
2042            Ok(Ok(data)) => {
2043                assert!(data.table_watermarks.is_empty());
2044                assert_eq!(1, data.uploaded_ssts.len());
2045                assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids());
2046            }
2047            _ => unreachable!(),
2048        }
2049
2050        uploader.update_pinned_version(version5);
2051        assert_eq!(epoch6, uploader.test_max_synced_epoch());
2052        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2053    }
2054
2055    #[tokio::test]
2056    async fn test_uploader_finish_in_order() {
2057        let config = StorageOpts {
2058            shared_buffer_capacity_mb: 1024 * 1024,
2059            shared_buffer_flush_ratio: 0.0,
2060            ..Default::default()
2061        };
2062        let (buffer_tracker, mut uploader, new_task_notifier) =
2063            prepare_uploader_order_test(&config, false);
2064
2065        let epoch1 = INITIAL_EPOCH.next_epoch();
2066        let epoch2 = epoch1.next_epoch();
2067        let epoch3 = epoch2.next_epoch();
2068        let epoch4 = epoch3.next_epoch();
2069        uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]);
2070        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2071        let memory_limiter = Some(memory_limiter.deref());
2072
2073        let instance_id1 = 1;
2074        let instance_id2 = 2;
2075
2076        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2077        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2078
2079        // imm2 contains data in newer epoch, but added first
2080        let imm2 = gen_imm_with_limiter(epoch2, memory_limiter).await;
2081        uploader.add_imm(instance_id2, imm2.clone());
2082        let imm1_1 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2083        uploader.add_imm(instance_id1, imm1_1.clone());
2084        let imm1_2 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2085        uploader.add_imm(instance_id1, imm1_2.clone());
2086
2087        // imm1 will be spilled first
2088        let epoch1_spill_payload12 =
2089            HashMap::from_iter([(instance_id1, vec![imm1_2.clone(), imm1_1.clone()])]);
2090        let epoch2_spill_payload = HashMap::from_iter([(instance_id2, vec![imm2.clone()])]);
2091        let (await_start1, finish_tx1) =
2092            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload12));
2093        let (await_start2, finish_tx2) =
2094            new_task_notifier(get_payload_imm_ids(&epoch2_spill_payload));
2095        uploader.may_flush();
2096        await_start1.await;
2097        await_start2.await;
2098
2099        assert_uploader_pending(&mut uploader).await;
2100
2101        finish_tx2.send(()).unwrap();
2102        assert_uploader_pending(&mut uploader).await;
2103
2104        finish_tx1.send(()).unwrap();
2105        let sst = uploader.next_uploaded_sst().await;
2106        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids());
2107        assert_eq!(&vec![epoch1], sst.epochs());
2108
2109        let sst = uploader.next_uploaded_sst().await;
2110        assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids());
2111        assert_eq!(&vec![epoch2], sst.epochs());
2112
2113        let imm1_3 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2114        uploader.add_imm(instance_id1, imm1_3.clone());
2115        let epoch1_spill_payload3 = HashMap::from_iter([(instance_id1, vec![imm1_3.clone()])]);
2116        let (await_start1_3, finish_tx1_3) =
2117            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload3));
2118        uploader.may_flush();
2119        await_start1_3.await;
2120        let imm1_4 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2121        uploader.add_imm(instance_id1, imm1_4.clone());
2122        let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]);
2123        let (await_start1_4, finish_tx1_4) =
2124            new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload));
2125        uploader.local_seal_epoch_for_test(instance_id1, epoch1);
2126        let (sync_tx1, mut sync_rx1) = oneshot::channel();
2127        uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID]));
2128        await_start1_4.await;
2129
2130        uploader.local_seal_epoch_for_test(instance_id1, epoch2);
2131        uploader.local_seal_epoch_for_test(instance_id2, epoch2);
2132
2133        // current uploader state:
2134        // unsealed: empty
2135        // sealed: epoch2: uploaded sst([imm2])
2136        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2137
2138        let imm3_1 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2139        let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]);
2140        uploader.add_imm(instance_id1, imm3_1.clone());
2141        let (await_start3_1, finish_tx3_1) =
2142            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload1));
2143        uploader.may_flush();
2144        await_start3_1.await;
2145        let imm3_2 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2146        let epoch3_spill_payload2 = HashMap::from_iter([(instance_id2, vec![imm3_2.clone()])]);
2147        uploader.add_imm(instance_id2, imm3_2.clone());
2148        let (await_start3_2, finish_tx3_2) =
2149            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload2));
2150        uploader.may_flush();
2151        await_start3_2.await;
2152        let imm3_3 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2153        uploader.add_imm(instance_id1, imm3_3.clone());
2154
2155        // current uploader state:
2156        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2157        // sealed: uploaded sst([imm2])
2158        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2159
2160        uploader.local_seal_epoch_for_test(instance_id1, epoch3);
2161        let imm4 = gen_imm_with_limiter(epoch4, memory_limiter).await;
2162        uploader.add_imm(instance_id1, imm4.clone());
2163        assert_uploader_pending(&mut uploader).await;
2164
2165        // current uploader state:
2166        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2167        //           epoch4: imm: imm4
2168        // sealed: uploaded sst([imm2])
2169        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2170
2171        finish_tx3_1.send(()).unwrap();
2172        assert_uploader_pending(&mut uploader).await;
2173        finish_tx1_4.send(()).unwrap();
2174        assert_uploader_pending(&mut uploader).await;
2175        finish_tx1_3.send(()).unwrap();
2176
2177        let sst = uploader.next_uploaded_sst().await;
2178        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids());
2179
2180        assert!(poll_fn(|cx| Poll::Ready(sync_rx1.poll_unpin(cx).is_pending())).await);
2181
2182        let sst = uploader.next_uploaded_sst().await;
2183        assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids());
2184
2185        match sync_rx1.await {
2186            Ok(Ok(data)) => {
2187                assert_eq!(3, data.uploaded_ssts.len());
2188                assert_eq!(
2189                    &get_payload_imm_ids(&epoch1_sync_payload),
2190                    data.uploaded_ssts[0].imm_ids()
2191                );
2192                assert_eq!(
2193                    &get_payload_imm_ids(&epoch1_spill_payload3),
2194                    data.uploaded_ssts[1].imm_ids()
2195                );
2196                assert_eq!(
2197                    &get_payload_imm_ids(&epoch1_spill_payload12),
2198                    data.uploaded_ssts[2].imm_ids()
2199                );
2200            }
2201            _ => {
2202                unreachable!()
2203            }
2204        }
2205
2206        // current uploader state:
2207        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2208        //           epoch4: imm: imm4
2209        // sealed: uploaded sst([imm2])
2210        // syncing: empty
2211        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2212
2213        let (sync_tx2, sync_rx2) = oneshot::channel();
2214        uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID]));
2215        uploader.local_seal_epoch_for_test(instance_id2, epoch3);
2216        let sst = uploader.next_uploaded_sst().await;
2217        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids());
2218
2219        match sync_rx2.await {
2220            Ok(Ok(data)) => {
2221                assert_eq!(data.uploaded_ssts.len(), 1);
2222                assert_eq!(
2223                    &get_payload_imm_ids(&epoch2_spill_payload),
2224                    data.uploaded_ssts[0].imm_ids()
2225                );
2226            }
2227            _ => {
2228                unreachable!("should be sync finish");
2229            }
2230        }
2231        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2232
2233        // current uploader state:
2234        // unsealed: epoch4: imm: imm4
2235        // sealed: imm: imm3_3, uploading: [imm3_2], uploaded: sst([imm3_1])
2236        // syncing: empty
2237        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2238        //         epoch2: sst([imm2])
2239
2240        uploader.local_seal_epoch_for_test(instance_id1, epoch4);
2241        uploader.local_seal_epoch_for_test(instance_id2, epoch4);
2242        let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]);
2243        let (await_start4_with_3_3, finish_tx4_with_3_3) =
2244            new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload));
2245        let (sync_tx4, mut sync_rx4) = oneshot::channel();
2246        uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID]));
2247        await_start4_with_3_3.await;
2248
2249        // current uploader state:
2250        // unsealed: empty
2251        // sealed: empty
2252        // syncing: epoch4: uploading: [imm4, imm3_3], [imm3_2], uploaded: sst([imm3_1])
2253        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2254        //         epoch2: sst([imm2])
2255
2256        assert_uploader_pending(&mut uploader).await;
2257
2258        finish_tx3_2.send(()).unwrap();
2259        let sst = uploader.next_uploaded_sst().await;
2260        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids());
2261
2262        finish_tx4_with_3_3.send(()).unwrap();
2263        assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await);
2264
2265        let sst = uploader.next_uploaded_sst().await;
2266        assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids());
2267
2268        match sync_rx4.await {
2269            Ok(Ok(data)) => {
2270                assert_eq!(3, data.uploaded_ssts.len());
2271                assert_eq!(
2272                    &get_payload_imm_ids(&epoch4_sync_payload),
2273                    data.uploaded_ssts[0].imm_ids()
2274                );
2275                assert_eq!(
2276                    &get_payload_imm_ids(&epoch3_spill_payload2),
2277                    data.uploaded_ssts[1].imm_ids()
2278                );
2279                assert_eq!(
2280                    &get_payload_imm_ids(&epoch3_spill_payload1),
2281                    data.uploaded_ssts[2].imm_ids(),
2282                )
2283            }
2284            _ => {
2285                unreachable!("should be sync finish");
2286            }
2287        }
2288        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2289
2290        // current uploader state:
2291        // unsealed: empty
2292        // sealed: empty
2293        // syncing: empty
2294        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2295        //         epoch2: sst([imm2])
2296        //         epoch4: sst([imm4, imm3_3]), sst([imm3_2]), sst([imm3_1])
2297    }
2298
2299    #[tokio::test]
2300    async fn test_uploader_frequently_flush() {
2301        let config = StorageOpts {
2302            shared_buffer_capacity_mb: 10,
2303            shared_buffer_flush_ratio: 0.8,
2304            // This test will fail when we set it to 0
2305            shared_buffer_min_batch_flush_size_mb: 1,
2306            ..Default::default()
2307        };
2308        let (buffer_tracker, mut uploader, _new_task_notifier) =
2309            prepare_uploader_order_test(&config, true);
2310
2311        let epoch1 = INITIAL_EPOCH.next_epoch();
2312        let epoch2 = epoch1.next_epoch();
2313        uploader.start_epochs_for_test([epoch1, epoch2]);
2314        let instance_id1 = 1;
2315        let instance_id2 = 2;
2316        let flush_threshold = buffer_tracker.flush_threshold();
2317        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2318
2319        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2320        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2321
2322        // imm2 contains data in newer epoch, but added first
2323        let mut total_memory = 0;
2324        while total_memory < flush_threshold {
2325            let imm = gen_imm_with_limiter(epoch2, Some(memory_limiter.as_ref())).await;
2326            total_memory += imm.size();
2327            if total_memory > flush_threshold {
2328                break;
2329            }
2330            uploader.add_imm(instance_id2, imm);
2331        }
2332        let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2333        uploader.add_imm(instance_id1, imm);
2334        assert!(uploader.may_flush());
2335
2336        for _ in 0..10 {
2337            let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2338            uploader.add_imm(instance_id1, imm);
2339            assert!(!uploader.may_flush());
2340        }
2341    }
2342}