risingwave_storage/hummock/event_handler/uploader/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod spiller;
16mod task_manager;
17pub(crate) mod test_utils;
18
19use std::cmp::Ordering;
20use std::collections::btree_map::Entry;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque, hash_map};
22use std::fmt::{Debug, Display, Formatter};
23use std::future::{Future, poll_fn};
24use std::mem::{replace, swap, take};
25use std::sync::Arc;
26use std::task::{Context, Poll, ready};
27
28use futures::FutureExt;
29use itertools::Itertools;
30use more_asserts::assert_gt;
31use prometheus::{HistogramTimer, IntGauge};
32use risingwave_common::bitmap::BitmapBuilder;
33use risingwave_common::catalog::TableId;
34use risingwave_common::metrics::UintGauge;
35use risingwave_common::must_match;
36use risingwave_hummock_sdk::table_watermark::{
37    TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
40use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId, LocalSstableInfo};
41use task_manager::{TaskManager, UploadingTaskStatus};
42use thiserror_ext::AsReport;
43use tokio::sync::oneshot;
44use tokio::task::JoinHandle;
45use tracing::{debug, error, info, warn};
46
47use crate::hummock::event_handler::LocalInstanceId;
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, send_sync_result};
49use crate::hummock::event_handler::uploader::spiller::Spiller;
50use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm;
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchId;
53use crate::hummock::store::version::StagingSstableInfo;
54use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
55use crate::mem_table::ImmId;
56use crate::monitor::HummockStateStoreMetrics;
57use crate::store::SealCurrentEpochOptions;
58
59/// Take epoch data inclusively before `epoch` out from `data`
60fn take_before_epoch<T>(
61    data: &mut BTreeMap<HummockEpoch, T>,
62    epoch: HummockEpoch,
63) -> BTreeMap<HummockEpoch, T> {
64    let mut before_epoch_data = data.split_off(&(epoch + 1));
65    swap(&mut before_epoch_data, data);
66    before_epoch_data
67}
68
69type UploadTaskInput = HashMap<LocalInstanceId, Vec<UploaderImm>>;
70pub type UploadTaskPayload = HashMap<LocalInstanceId, Vec<ImmutableMemtable>>;
71
72#[derive(Debug)]
73pub struct UploadTaskOutput {
74    pub new_value_ssts: Vec<LocalSstableInfo>,
75    pub old_value_ssts: Vec<LocalSstableInfo>,
76    pub wait_poll_timer: Option<HistogramTimer>,
77}
78pub type SpawnUploadTask = Arc<
79    dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle<HummockResult<UploadTaskOutput>>
80        + Send
81        + Sync
82        + 'static,
83>;
84
85#[derive(Clone)]
86pub struct UploadTaskInfo {
87    pub task_size: usize,
88    pub epochs: Vec<HummockEpoch>,
89    pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
90}
91
92impl Display for UploadTaskInfo {
93    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94        f.debug_struct("UploadTaskInfo")
95            .field("task_size", &self.task_size)
96            .field("epochs", &self.epochs)
97            .field("len(imm_ids)", &self.imm_ids.len())
98            .finish()
99    }
100}
101
102impl Debug for UploadTaskInfo {
103    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("UploadTaskInfo")
105            .field("task_size", &self.task_size)
106            .field("epochs", &self.epochs)
107            .field("imm_ids", &self.imm_ids)
108            .finish()
109    }
110}
111
112mod uploader_imm {
113    use std::fmt::Formatter;
114    use std::ops::Deref;
115
116    use risingwave_common::metrics::UintGauge;
117
118    use crate::hummock::event_handler::uploader::UploaderContext;
119    use crate::mem_table::ImmutableMemtable;
120
121    pub(super) struct UploaderImm {
122        inner: ImmutableMemtable,
123        size_guard: UintGauge,
124    }
125
126    impl UploaderImm {
127        pub(super) fn new(imm: ImmutableMemtable, context: &UploaderContext) -> Self {
128            let size = imm.size();
129            let size_guard = context.stats.uploader_imm_size.clone();
130            size_guard.add(size as _);
131            Self {
132                inner: imm,
133                size_guard,
134            }
135        }
136
137        #[cfg(test)]
138        pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
139            Self {
140                inner: imm,
141                size_guard: UintGauge::new("test", "test").unwrap(),
142            }
143        }
144    }
145
146    impl std::fmt::Debug for UploaderImm {
147        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
148            self.inner.fmt(f)
149        }
150    }
151
152    impl Deref for UploaderImm {
153        type Target = ImmutableMemtable;
154
155        fn deref(&self) -> &Self::Target {
156            &self.inner
157        }
158    }
159
160    impl Drop for UploaderImm {
161        fn drop(&mut self) {
162            self.size_guard.sub(self.inner.size() as _);
163        }
164    }
165}
166
167#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
168struct UploadingTaskId(usize);
169
170/// A wrapper for a uploading task that compacts and uploads the imm payload. Task context are
171/// stored so that when the task fails, it can be re-tried.
172struct UploadingTask {
173    task_id: UploadingTaskId,
174    // newer data at the front
175    input: UploadTaskInput,
176    join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
177    task_info: UploadTaskInfo,
178    spawn_upload_task: SpawnUploadTask,
179    task_size_guard: UintGauge,
180    task_count_guard: IntGauge,
181}
182
183impl Drop for UploadingTask {
184    fn drop(&mut self) {
185        self.task_size_guard.sub(self.task_info.task_size as u64);
186        self.task_count_guard.dec();
187    }
188}
189
190impl Debug for UploadingTask {
191    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
192        f.debug_struct("UploadingTask")
193            .field("input", &self.input)
194            .field("task_info", &self.task_info)
195            .finish()
196    }
197}
198
199fn get_payload_imm_ids(
200    payload: &UploadTaskPayload,
201) -> HashMap<LocalInstanceId, Vec<SharedBufferBatchId>> {
202    payload
203        .iter()
204        .map(|(instance_id, imms)| {
205            (
206                *instance_id,
207                imms.iter().map(|imm| imm.batch_id()).collect_vec(),
208            )
209        })
210        .collect()
211}
212
213impl UploadingTask {
214    // INFO logs will be enabled for task with size exceeding 50MB.
215    const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);
216
217    fn input_to_payload(input: &UploadTaskInput) -> UploadTaskPayload {
218        input
219            .iter()
220            .map(|(instance_id, imms)| {
221                (
222                    *instance_id,
223                    imms.iter().map(|imm| (**imm).clone()).collect(),
224                )
225            })
226            .collect()
227    }
228
229    fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
230        assert!(!input.is_empty());
231        let mut epochs = input
232            .iter()
233            .flat_map(|(_, imms)| imms.iter().flat_map(|imm| imm.epochs().iter().cloned()))
234            .sorted()
235            .dedup()
236            .collect_vec();
237
238        // reverse to make newer epochs comes first
239        epochs.reverse();
240        let payload = Self::input_to_payload(&input);
241        let imm_ids = get_payload_imm_ids(&payload);
242        let task_size = input
243            .values()
244            .map(|imms| imms.iter().map(|imm| imm.size()).sum::<usize>())
245            .sum();
246        let task_info = UploadTaskInfo {
247            task_size,
248            epochs,
249            imm_ids,
250        };
251        context
252            .buffer_tracker
253            .global_upload_task_size()
254            .add(task_size as u64);
255        if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
256            info!("start upload task: {:?}", task_info);
257        } else {
258            debug!("start upload task: {:?}", task_info);
259        }
260        let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
261        context.stats.uploader_uploading_task_count.inc();
262        Self {
263            task_id,
264            input,
265            join_handle,
266            task_info,
267            spawn_upload_task: context.spawn_upload_task.clone(),
268            task_size_guard: context.buffer_tracker.global_upload_task_size().clone(),
269            task_count_guard: context.stats.uploader_uploading_task_count.clone(),
270        }
271    }
272
273    /// Poll the result of the uploading task
274    fn poll_result(
275        &mut self,
276        cx: &mut Context<'_>,
277    ) -> Poll<HummockResult<Arc<StagingSstableInfo>>> {
278        Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
279            Ok(task_result) => task_result
280                .inspect(|_| {
281                    if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
282                        info!(task_info = ?self.task_info, "upload task finish");
283                    } else {
284                        debug!(task_info = ?self.task_info, "upload task finish");
285                    }
286                })
287                .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
288                .map(|output| {
289                    Arc::new(StagingSstableInfo::new(
290                        output.new_value_ssts,
291                        output.old_value_ssts,
292                        self.task_info.epochs.clone(),
293                        self.task_info.imm_ids.clone(),
294                        self.task_info.task_size,
295                    ))
296                }),
297
298            Err(err) => Err(HummockError::other(format!(
299                "fail to join upload join handle: {}",
300                err.as_report()
301            ))),
302        })
303    }
304
305    /// Poll the uploading task until it succeeds. If it fails, we will retry it.
306    fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll<Arc<StagingSstableInfo>> {
307        loop {
308            let result = ready!(self.poll_result(cx));
309            match result {
310                Ok(sstables) => return Poll::Ready(sstables),
311                Err(e) => {
312                    error!(
313                        error = %e.as_report(),
314                        task_info = ?self.task_info,
315                        "a flush task failed, start retry",
316                    );
317                    self.join_handle = (self.spawn_upload_task)(
318                        Self::input_to_payload(&self.input),
319                        self.task_info.clone(),
320                    );
321                    // It is important not to return Poll::pending here immediately, because the new
322                    // join_handle is not polled yet, and will not awake the current task when
323                    // succeed. It will be polled in the next loop iteration.
324                }
325            }
326        }
327    }
328
329    pub fn get_task_info(&self) -> &UploadTaskInfo {
330        &self.task_info
331    }
332}
333
334impl TableUnsyncData {
335    fn add_table_watermarks(
336        &mut self,
337        epoch: HummockEpoch,
338        table_watermarks: Vec<VnodeWatermark>,
339        direction: WatermarkDirection,
340        watermark_type: WatermarkSerdeType,
341    ) {
342        if table_watermarks.is_empty() {
343            return;
344        }
345        let vnode_count = table_watermarks[0].vnode_count();
346        for watermark in &table_watermarks {
347            assert_eq!(vnode_count, watermark.vnode_count());
348        }
349
350        fn apply_new_vnodes(
351            vnode_bitmap: &mut BitmapBuilder,
352            vnode_watermarks: &Vec<VnodeWatermark>,
353        ) {
354            for vnode_watermark in vnode_watermarks {
355                for vnode in vnode_watermark.vnode_bitmap().iter_ones() {
356                    assert!(
357                        !vnode_bitmap.is_set(vnode),
358                        "vnode {} write multiple table watermarks",
359                        vnode
360                    );
361                    vnode_bitmap.set(vnode, true);
362                }
363            }
364        }
365        match &mut self.table_watermarks {
366            Some((prev_direction, prev_watermarks, prev_watermark_type)) => {
367                assert_eq!(
368                    *prev_direction, direction,
369                    "table id {} new watermark direction not match with previous",
370                    self.table_id
371                );
372                assert_eq!(
373                    *prev_watermark_type, watermark_type,
374                    "table id {} new watermark watermark_type not match with previous",
375                    self.table_id
376                );
377                match prev_watermarks.entry(epoch) {
378                    Entry::Occupied(mut entry) => {
379                        let (prev_watermarks, vnode_bitmap) = entry.get_mut();
380                        apply_new_vnodes(vnode_bitmap, &table_watermarks);
381                        prev_watermarks.extend(table_watermarks);
382                    }
383                    Entry::Vacant(entry) => {
384                        let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
385                        apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
386                        entry.insert((table_watermarks, vnode_bitmap));
387                    }
388                }
389            }
390            None => {
391                let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
392                apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
393                self.table_watermarks = Some((
394                    direction,
395                    BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]),
396                    watermark_type,
397                ));
398            }
399        }
400    }
401}
402
403impl UploaderData {
404    fn add_table_watermarks(
405        all_table_watermarks: &mut HashMap<TableId, TableWatermarks>,
406        table_id: TableId,
407        direction: WatermarkDirection,
408        watermarks: impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)>,
409        watermark_type: WatermarkSerdeType,
410    ) {
411        let mut table_watermarks: Option<TableWatermarks> = None;
412        for (epoch, watermarks) in watermarks {
413            match &mut table_watermarks {
414                Some(prev_watermarks) => {
415                    assert_eq!(prev_watermarks.direction, direction);
416                    assert_eq!(prev_watermarks.watermark_type, watermark_type);
417                    prev_watermarks.add_new_epoch_watermarks(
418                        epoch,
419                        Arc::from(watermarks),
420                        direction,
421                        watermark_type,
422                    );
423                }
424                None => {
425                    table_watermarks = Some(TableWatermarks::single_epoch(
426                        epoch,
427                        watermarks,
428                        direction,
429                        watermark_type,
430                    ));
431                }
432            }
433        }
434        if let Some(table_watermarks) = table_watermarks {
435            assert!(
436                all_table_watermarks
437                    .insert(table_id, table_watermarks)
438                    .is_none()
439            );
440        }
441    }
442}
443
444struct LocalInstanceEpochData {
445    epoch: HummockEpoch,
446    // newer data comes first.
447    imms: VecDeque<UploaderImm>,
448    has_spilled: bool,
449}
450
451impl LocalInstanceEpochData {
452    fn new(epoch: HummockEpoch) -> Self {
453        Self {
454            epoch,
455            imms: VecDeque::new(),
456            has_spilled: false,
457        }
458    }
459
460    fn epoch(&self) -> HummockEpoch {
461        self.epoch
462    }
463
464    fn add_imm(&mut self, imm: UploaderImm) {
465        assert_eq!(imm.max_epoch(), imm.min_epoch());
466        assert_eq!(self.epoch, imm.min_epoch());
467        if let Some(prev_imm) = self.imms.front() {
468            assert_gt!(imm.batch_id(), prev_imm.batch_id());
469        }
470        self.imms.push_front(imm);
471    }
472
473    fn is_empty(&self) -> bool {
474        self.imms.is_empty()
475    }
476}
477
478struct LocalInstanceUnsyncData {
479    table_id: TableId,
480    instance_id: LocalInstanceId,
481    // None means that the current instance should have stopped advancing
482    current_epoch_data: Option<LocalInstanceEpochData>,
483    // newer data comes first.
484    sealed_data: VecDeque<LocalInstanceEpochData>,
485    // newer data comes first
486    flushing_imms: VecDeque<SharedBufferBatchId>,
487    is_destroyed: bool,
488    is_stopped: bool,
489}
490
491impl LocalInstanceUnsyncData {
492    fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self {
493        Self {
494            table_id,
495            instance_id,
496            current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
497            sealed_data: VecDeque::new(),
498            flushing_imms: Default::default(),
499            is_destroyed: false,
500            is_stopped: false,
501        }
502    }
503
504    fn add_imm(&mut self, imm: UploaderImm) {
505        assert!(!self.is_destroyed);
506        assert!(!self.is_stopped);
507        assert_eq!(self.table_id, imm.table_id);
508        self.current_epoch_data
509            .as_mut()
510            .expect("should be Some when adding new imm")
511            .add_imm(imm);
512    }
513
514    fn local_seal_epoch(&mut self, next_epoch: HummockEpoch, stopped: bool) -> HummockEpoch {
515        let data = self
516            .current_epoch_data
517            .as_mut()
518            .expect("should be Some when seal new epoch");
519        let current_epoch = data.epoch;
520        debug!(
521            instance_id = self.instance_id,
522            next_epoch, current_epoch, stopped, "local seal epoch"
523        );
524        assert_gt!(next_epoch, current_epoch);
525        assert!(!self.is_destroyed);
526        assert!(!self.is_stopped);
527        self.is_stopped = stopped;
528        let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch));
529        if !epoch_data.is_empty() {
530            self.sealed_data.push_front(epoch_data);
531        }
532        current_epoch
533    }
534
535    // imm_ids from old to new, which means in ascending order
536    fn ack_flushed(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
537        for imm_id in imm_ids {
538            assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id);
539        }
540    }
541
542    fn spill(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
543        let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() {
544            match oldest_sealed_epoch.epoch.cmp(&epoch) {
545                Ordering::Less => {
546                    unreachable!(
547                        "should not spill at this epoch because there \
548                    is unspilled data in previous epoch: prev epoch {}, spill epoch {}",
549                        oldest_sealed_epoch.epoch, epoch
550                    );
551                }
552                Ordering::Equal => {
553                    let epoch_data = self.sealed_data.pop_back().unwrap();
554                    assert_eq!(epoch, epoch_data.epoch);
555                    epoch_data.imms
556                }
557                Ordering::Greater => VecDeque::new(),
558            }
559        } else {
560            let Some(current_epoch_data) = &mut self.current_epoch_data else {
561                return Vec::new();
562            };
563            match current_epoch_data.epoch.cmp(&epoch) {
564                Ordering::Less => {
565                    assert!(
566                        current_epoch_data.imms.is_empty(),
567                        "should not spill at this epoch because there \
568                    is unspilled data in current epoch epoch {}, spill epoch {}",
569                        current_epoch_data.epoch,
570                        epoch
571                    );
572                    VecDeque::new()
573                }
574                Ordering::Equal => {
575                    if !current_epoch_data.imms.is_empty() {
576                        current_epoch_data.has_spilled = true;
577                        take(&mut current_epoch_data.imms)
578                    } else {
579                        VecDeque::new()
580                    }
581                }
582                Ordering::Greater => VecDeque::new(),
583            }
584        };
585        self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
586        imms.into_iter().collect()
587    }
588
589    fn add_flushing_imm(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
590        for imm_id in imm_ids {
591            if let Some(prev_imm_id) = self.flushing_imms.front() {
592                assert_gt!(imm_id, *prev_imm_id);
593            }
594            self.flushing_imms.push_front(imm_id);
595        }
596    }
597
598    // start syncing the imm inclusively before the `epoch`
599    // returning data with newer data coming first
600    fn sync(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
601        // firstly added from old to new
602        let mut ret = Vec::new();
603        while let Some(epoch_data) = self.sealed_data.back()
604            && epoch_data.epoch() <= epoch
605        {
606            let imms = self.sealed_data.pop_back().expect("checked exist").imms;
607            self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
608            ret.extend(imms.into_iter().rev());
609        }
610        // reverse so that newer data comes first
611        ret.reverse();
612        if let Some(latest_epoch_data) = &self.current_epoch_data
613            && latest_epoch_data.epoch <= epoch
614        {
615            assert!(self.sealed_data.is_empty());
616            assert!(latest_epoch_data.is_empty());
617            assert!(!latest_epoch_data.has_spilled);
618            if cfg!(debug_assertions) {
619                panic!(
620                    "sync epoch exceeds latest epoch, and the current instance should have been archived, table_id = {}, latest_epoch_data = {}, epoch = {}",
621                    self.table_id,
622                    latest_epoch_data.epoch(),
623                    epoch
624                );
625            }
626            warn!(
627                instance_id = self.instance_id,
628                table_id = %self.table_id,
629                "sync epoch exceeds latest epoch, and the current instance should have be archived"
630            );
631            self.current_epoch_data = None;
632        }
633        ret
634    }
635
636    fn assert_after_epoch(&self, epoch: HummockEpoch) {
637        if let Some(oldest_sealed_data) = self.sealed_data.back() {
638            assert!(!oldest_sealed_data.imms.is_empty());
639            assert_gt!(oldest_sealed_data.epoch, epoch);
640        } else if let Some(current_data) = &self.current_epoch_data
641            && current_data.epoch <= epoch
642        {
643            assert!(current_data.imms.is_empty() && !current_data.has_spilled);
644        }
645    }
646
647    fn is_finished(&self) -> bool {
648        self.is_destroyed && self.sealed_data.is_empty()
649    }
650}
651
652struct TableUnsyncData {
653    table_id: TableId,
654    instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
655    #[expect(clippy::type_complexity)]
656    table_watermarks: Option<(
657        WatermarkDirection,
658        BTreeMap<HummockEpoch, (Vec<VnodeWatermark>, BitmapBuilder)>,
659        WatermarkSerdeType,
660    )>,
661    spill_tasks: BTreeMap<HummockEpoch, VecDeque<UploadingTaskId>>,
662    unsync_epochs: BTreeMap<HummockEpoch, ()>,
663    // newer epoch at the front
664    syncing_epochs: VecDeque<HummockEpoch>,
665    max_synced_epoch: Option<HummockEpoch>,
666}
667
668impl TableUnsyncData {
669    fn new(table_id: TableId, committed_epoch: Option<HummockEpoch>) -> Self {
670        Self {
671            table_id,
672            instance_data: Default::default(),
673            table_watermarks: None,
674            spill_tasks: Default::default(),
675            unsync_epochs: Default::default(),
676            syncing_epochs: Default::default(),
677            max_synced_epoch: committed_epoch,
678        }
679    }
680
681    fn new_epoch(&mut self, epoch: HummockEpoch) {
682        debug!(table_id = ?self.table_id, epoch, "table new epoch");
683        if let Some(latest_epoch) = self.max_epoch() {
684            assert_gt!(epoch, latest_epoch);
685        }
686        self.unsync_epochs.insert(epoch, ());
687    }
688
689    #[expect(clippy::type_complexity)]
690    fn sync(
691        &mut self,
692        epoch: HummockEpoch,
693    ) -> (
694        impl Iterator<Item = (LocalInstanceId, Vec<UploaderImm>)> + '_,
695        Option<(
696            WatermarkDirection,
697            impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)> + use<>,
698            WatermarkSerdeType,
699        )>,
700        impl Iterator<Item = UploadingTaskId> + use<>,
701        BTreeMap<HummockEpoch, ()>,
702    ) {
703        if let Some(prev_epoch) = self.max_sync_epoch() {
704            assert_gt!(epoch, prev_epoch)
705        }
706        let epochs = take_before_epoch(&mut self.unsync_epochs, epoch);
707        assert_eq!(
708            *epochs.last_key_value().expect("non-empty").0,
709            epoch,
710            "{epochs:?} {epoch} {:?}",
711            self.table_id
712        );
713        self.syncing_epochs.push_front(epoch);
714        (
715            self.instance_data
716                .iter_mut()
717                .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))),
718            self.table_watermarks
719                .as_mut()
720                .map(|(direction, watermarks, watermark_type)| {
721                    let watermarks = take_before_epoch(watermarks, epoch)
722                        .into_iter()
723                        .map(|(epoch, (watermarks, _))| (epoch, watermarks));
724                    (*direction, watermarks, *watermark_type)
725                }),
726            take_before_epoch(&mut self.spill_tasks, epoch)
727                .into_values()
728                .flat_map(|tasks| tasks.into_iter()),
729            epochs,
730        )
731    }
732
733    fn ack_synced(&mut self, sync_epoch: HummockEpoch) {
734        let min_sync_epoch = self.syncing_epochs.pop_back().expect("should exist");
735        assert_eq!(sync_epoch, min_sync_epoch);
736        self.max_synced_epoch = Some(sync_epoch);
737    }
738
739    fn ack_committed(&mut self, committed_epoch: HummockEpoch) {
740        let synced_epoch_advanced = {
741            if let Some(max_synced_epoch) = self.max_synced_epoch
742                && max_synced_epoch >= committed_epoch
743            {
744                false
745            } else {
746                true
747            }
748        };
749        if synced_epoch_advanced {
750            self.max_synced_epoch = Some(committed_epoch);
751            if let Some(min_syncing_epoch) = self.syncing_epochs.back() {
752                assert_gt!(*min_syncing_epoch, committed_epoch);
753            }
754            self.assert_after_epoch(committed_epoch);
755        }
756    }
757
758    fn assert_after_epoch(&self, epoch: HummockEpoch) {
759        self.instance_data
760            .values()
761            .for_each(|instance_data| instance_data.assert_after_epoch(epoch));
762        if let Some((_, watermarks, _)) = &self.table_watermarks
763            && let Some((oldest_epoch, _)) = watermarks.first_key_value()
764        {
765            assert_gt!(*oldest_epoch, epoch);
766        }
767    }
768
769    fn max_sync_epoch(&self) -> Option<HummockEpoch> {
770        self.syncing_epochs
771            .front()
772            .cloned()
773            .or(self.max_synced_epoch)
774    }
775
776    fn max_epoch(&self) -> Option<HummockEpoch> {
777        self.unsync_epochs
778            .last_key_value()
779            .map(|(epoch, _)| *epoch)
780            .or_else(|| self.max_sync_epoch())
781    }
782
783    fn is_empty(&self) -> bool {
784        self.instance_data.is_empty()
785            && self.syncing_epochs.is_empty()
786            && self.unsync_epochs.is_empty()
787    }
788}
789
790#[derive(Eq, Hash, PartialEq, Copy, Clone)]
791struct UnsyncEpochId(HummockEpoch, TableId);
792
793impl UnsyncEpochId {
794    fn epoch(&self) -> HummockEpoch {
795        self.0
796    }
797}
798
799fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet<TableId>) -> Option<UnsyncEpochId> {
800    table_ids
801        .iter()
802        .min()
803        .map(|table_id| UnsyncEpochId(epoch, *table_id))
804}
805
806#[derive(Default)]
807/// Unsync data, can be either imm or spilled sst, and some aggregated epoch information.
808///
809/// `instance_data` holds the imm of each individual local instance, and data are first added here.
810/// The aggregated epoch information (table watermarks, etc.) and the spilled sst will be added to `epoch_data`.
811struct UnsyncData {
812    table_data: HashMap<TableId, TableUnsyncData>,
813    // An index as a mapping from instance id to its table id
814    instance_table_id: HashMap<LocalInstanceId, TableId>,
815    unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
816    spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
817}
818
819impl UnsyncData {
820    fn init_instance(
821        &mut self,
822        table_id: TableId,
823        instance_id: LocalInstanceId,
824        init_epoch: HummockEpoch,
825    ) {
826        debug!(
827            table_id = %table_id,
828            instance_id, init_epoch, "init epoch"
829        );
830        let table_data = self
831            .table_data
832            .get_mut(&table_id)
833            .unwrap_or_else(|| panic!("should exist. {table_id:?}"));
834        assert!(
835            table_data
836                .instance_data
837                .insert(
838                    instance_id,
839                    LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch)
840                )
841                .is_none()
842        );
843        assert!(
844            self.instance_table_id
845                .insert(instance_id, table_id)
846                .is_none()
847        );
848        assert!(table_data.unsync_epochs.contains_key(&init_epoch));
849    }
850
851    fn instance_data(
852        &mut self,
853        instance_id: LocalInstanceId,
854    ) -> Option<&mut LocalInstanceUnsyncData> {
855        self.instance_table_id
856            .get_mut(&instance_id)
857            .cloned()
858            .map(move |table_id| {
859                self.table_data
860                    .get_mut(&table_id)
861                    .expect("should exist")
862                    .instance_data
863                    .get_mut(&instance_id)
864                    .expect("should exist")
865            })
866    }
867
868    fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) {
869        self.instance_data(instance_id)
870            .expect("should exist")
871            .add_imm(imm);
872    }
873
874    fn local_seal_epoch(
875        &mut self,
876        instance_id: LocalInstanceId,
877        next_epoch: HummockEpoch,
878        opts: SealCurrentEpochOptions,
879    ) {
880        let table_id = self.instance_table_id[&instance_id];
881        let table_data = self.table_data.get_mut(&table_id).expect("should exist");
882        let instance_data = table_data
883            .instance_data
884            .get_mut(&instance_id)
885            .expect("should exist");
886        // When drop/cancel a streaming job, for the barrier to stop actor, the
887        // local instance will call `local_seal_epoch`, but the `next_epoch` won't be
888        // called `start_epoch` because we have stopped writing on it.
889        let stopped = !table_data.unsync_epochs.contains_key(&next_epoch);
890        let epoch = instance_data.local_seal_epoch(next_epoch, stopped);
891        if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks {
892            table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type);
893        }
894    }
895
896    fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
897        if let Some(table_id) = self.instance_table_id.get(&instance_id) {
898            debug!(instance_id, "destroy instance");
899            let table_data = self.table_data.get_mut(table_id).expect("should exist");
900            let instance_data = table_data
901                .instance_data
902                .get_mut(&instance_id)
903                .expect("should exist");
904            assert!(
905                !instance_data.is_destroyed,
906                "cannot destroy an instance for twice"
907            );
908            instance_data.is_destroyed = true;
909        }
910    }
911
912    fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
913        for table_id in table_ids {
914            if let Some(table_unsync_data) = self.table_data.remove(table_id) {
915                for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
916                    if let Some(task_status) = task_manager.abort_task(task_id) {
917                        must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
918                            assert!(spill_table_ids.is_subset(table_ids));
919                        });
920                    }
921                    if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
922                        assert!(spill_table_ids.is_subset(table_ids));
923                    }
924                }
925                assert!(
926                    table_unsync_data
927                        .instance_data
928                        .values()
929                        .all(|instance| instance.is_destroyed),
930                    "should be clear when dropping the read version instance"
931                );
932                for instance_id in table_unsync_data.instance_data.keys() {
933                    assert_eq!(
934                        *table_id,
935                        self.instance_table_id
936                            .remove(instance_id)
937                            .expect("should exist")
938                    );
939                }
940            }
941        }
942        debug_assert!(
943            self.spilled_data
944                .values()
945                .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))
946        );
947        self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
948            if !unsync_epoch_table_ids.is_disjoint(table_ids) {
949                assert!(unsync_epoch_table_ids.is_subset(table_ids));
950                false
951            } else {
952                true
953            }
954        });
955        assert!(
956            self.instance_table_id
957                .values()
958                .all(|table_id| !table_ids.contains(table_id))
959        );
960    }
961}
962
963impl UploaderData {
964    fn sync(
965        &mut self,
966        context: &UploaderContext,
967        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
968        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
969    ) {
970        let mut all_table_watermarks = HashMap::new();
971        let mut uploading_tasks = HashSet::new();
972        let mut spilled_tasks = BTreeSet::new();
973        let mut all_table_ids = HashSet::new();
974        let mut vector_index_adds = HashMap::new();
975
976        let mut flush_payload = HashMap::new();
977
978        for (epoch, table_ids) in &sync_table_epochs {
979            let epoch = *epoch;
980            for table_id in table_ids {
981                assert!(
982                    all_table_ids.insert(*table_id),
983                    "duplicate sync table epoch: {:?} {:?}",
984                    all_table_ids,
985                    sync_table_epochs
986                );
987            }
988            if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) {
989                let min_table_id_data = self
990                    .unsync_data
991                    .table_data
992                    .get_mut(&min_table_id)
993                    .expect("should exist");
994                let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch);
995                for epoch in epochs.keys() {
996                    assert_eq!(
997                        &self
998                            .unsync_data
999                            .unsync_epochs
1000                            .remove(&UnsyncEpochId(*epoch, min_table_id))
1001                            .expect("should exist"),
1002                        table_ids
1003                    );
1004                }
1005                for table_id in table_ids {
1006                    let table_data = self
1007                        .unsync_data
1008                        .table_data
1009                        .get_mut(table_id)
1010                        .expect("should exist");
1011                    let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) =
1012                        table_data.sync(epoch);
1013                    assert_eq!(table_unsync_epochs, epochs);
1014                    for (instance_id, payload) in unflushed_payload {
1015                        if !payload.is_empty() {
1016                            flush_payload.insert(instance_id, payload);
1017                        }
1018                    }
1019                    table_data.instance_data.retain(|instance_id, data| {
1020                        // remove the finished instances
1021                        if data.is_finished() {
1022                            assert_eq!(
1023                                self.unsync_data.instance_table_id.remove(instance_id),
1024                                Some(*table_id)
1025                            );
1026                            false
1027                        } else {
1028                            true
1029                        }
1030                    });
1031                    if let Some((direction, watermarks, watermark_type)) = table_watermarks {
1032                        Self::add_table_watermarks(
1033                            &mut all_table_watermarks,
1034                            *table_id,
1035                            direction,
1036                            watermarks,
1037                            watermark_type,
1038                        );
1039                    }
1040                    for task_id in task_ids {
1041                        if self.unsync_data.spilled_data.contains_key(&task_id) {
1042                            spilled_tasks.insert(task_id);
1043                        } else {
1044                            uploading_tasks.insert(task_id);
1045                        }
1046                    }
1047
1048                    if let hash_map::Entry::Occupied(mut entry) =
1049                        self.unsync_vector_index_data.entry(*table_id)
1050                    {
1051                        let data = entry.get_mut();
1052                        let adds = take_before_epoch(&mut data.sealed_epoch_data, epoch)
1053                            .into_values()
1054                            .flatten()
1055                            .collect_vec();
1056                        if data.is_dropped && data.sealed_epoch_data.is_empty() {
1057                            entry.remove();
1058                        }
1059                        if !adds.is_empty() {
1060                            vector_index_adds
1061                                .try_insert(*table_id, adds)
1062                                .expect("non-duplicate");
1063                        }
1064                    }
1065                }
1066            }
1067        }
1068
1069        let sync_id = {
1070            let sync_id = self.next_sync_id;
1071            self.next_sync_id += 1;
1072            SyncId(sync_id)
1073        };
1074
1075        if let Some(extra_flush_task_id) = self.task_manager.sync(
1076            context,
1077            sync_id,
1078            flush_payload,
1079            uploading_tasks.iter().cloned(),
1080            &all_table_ids,
1081        ) {
1082            uploading_tasks.insert(extra_flush_task_id);
1083        }
1084
1085        // iter from large task_id to small one so that newer data at the front
1086        let uploaded = spilled_tasks
1087            .iter()
1088            .rev()
1089            .map(|task_id| {
1090                let (sst, spill_table_ids) = self
1091                    .unsync_data
1092                    .spilled_data
1093                    .remove(task_id)
1094                    .expect("should exist");
1095                assert!(
1096                    spill_table_ids.is_subset(&all_table_ids),
1097                    "spilled tabled ids {:?} not a subset of sync table id {:?}",
1098                    spill_table_ids,
1099                    all_table_ids
1100                );
1101                sst
1102            })
1103            .collect();
1104
1105        self.syncing_data.insert(
1106            sync_id,
1107            SyncingData {
1108                sync_table_epochs,
1109                remaining_uploading_tasks: uploading_tasks,
1110                uploaded,
1111                table_watermarks: all_table_watermarks,
1112                vector_index_adds,
1113                sync_result_sender,
1114            },
1115        );
1116
1117        self.check_upload_task_consistency();
1118    }
1119}
1120
1121impl UnsyncData {
1122    fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
1123        for (instance_id, imm_ids) in sstable_info.imm_ids() {
1124            if let Some(instance_data) = self.instance_data(*instance_id) {
1125                // take `rev` to let old imm id goes first
1126                instance_data.ack_flushed(imm_ids.iter().rev().cloned());
1127            }
1128        }
1129    }
1130}
1131
1132struct SyncingData {
1133    sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1134    remaining_uploading_tasks: HashSet<UploadingTaskId>,
1135    // newer data at the front
1136    uploaded: VecDeque<Arc<StagingSstableInfo>>,
1137    table_watermarks: HashMap<TableId, TableWatermarks>,
1138    vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1139    sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1140}
1141
1142#[derive(Debug)]
1143pub struct SyncedData {
1144    pub uploaded_ssts: VecDeque<Arc<StagingSstableInfo>>,
1145    pub table_watermarks: HashMap<TableId, TableWatermarks>,
1146    pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1147}
1148
1149struct UploaderContext {
1150    pinned_version: PinnedVersion,
1151    /// When called, it will spawn a task to flush the imm into sst and return the join handle.
1152    spawn_upload_task: SpawnUploadTask,
1153    buffer_tracker: BufferTracker,
1154
1155    stats: Arc<HummockStateStoreMetrics>,
1156}
1157
1158impl UploaderContext {
1159    fn new(
1160        pinned_version: PinnedVersion,
1161        spawn_upload_task: SpawnUploadTask,
1162        buffer_tracker: BufferTracker,
1163        stats: Arc<HummockStateStoreMetrics>,
1164    ) -> Self {
1165        UploaderContext {
1166            pinned_version,
1167            spawn_upload_task,
1168            buffer_tracker,
1169            stats,
1170        }
1171    }
1172}
1173
1174#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
1175struct SyncId(usize);
1176
1177struct UnsyncVectorIndexData {
1178    sealed_epoch_data: BTreeMap<HummockEpoch, Option<VectorIndexAdd>>,
1179    curr_epoch: u64,
1180    is_dropped: bool,
1181}
1182
1183#[derive(Default)]
1184struct UploaderData {
1185    unsync_data: UnsyncData,
1186    unsync_vector_index_data: HashMap<TableId, UnsyncVectorIndexData>,
1187
1188    syncing_data: BTreeMap<SyncId, SyncingData>,
1189
1190    task_manager: TaskManager,
1191    next_sync_id: usize,
1192}
1193
1194impl UploaderData {
1195    fn abort(self, err: impl Fn() -> HummockError) {
1196        self.task_manager.abort_all_tasks();
1197        for syncing_data in self.syncing_data.into_values() {
1198            send_sync_result(syncing_data.sync_result_sender, Err(err()));
1199        }
1200    }
1201
1202    fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
1203        if table_ids.is_empty() {
1204            return;
1205        }
1206        self.unsync_data
1207            .clear_tables(&table_ids, &mut self.task_manager);
1208        self.syncing_data.retain(|sync_id, syncing_data| {
1209            if syncing_data
1210                .sync_table_epochs
1211                .iter()
1212                .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids))
1213            {
1214                assert!(
1215                    syncing_data
1216                        .sync_table_epochs
1217                        .iter()
1218                        .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))
1219                );
1220                for task_id in &syncing_data.remaining_uploading_tasks {
1221                    match self
1222                        .task_manager
1223                        .abort_task(*task_id)
1224                        .expect("should exist")
1225                    {
1226                        UploadingTaskStatus::Spilling(spill_table_ids) => {
1227                            assert!(spill_table_ids.is_subset(&table_ids));
1228                        }
1229                        UploadingTaskStatus::Sync(task_sync_id) => {
1230                            assert_eq!(sync_id, &task_sync_id);
1231                        }
1232                    }
1233                }
1234                false
1235            } else {
1236                true
1237            }
1238        });
1239
1240        self.check_upload_task_consistency();
1241    }
1242
1243    fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1244        self.unsync_data
1245            .spilled_data
1246            .values()
1247            .map(|(s, _)| s)
1248            .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
1249            .filter_map(|s| {
1250                s.sstable_infos()
1251                    .iter()
1252                    .chain(s.old_value_sstable_infos())
1253                    .map(|s| s.sst_info.object_id)
1254                    .min()
1255            })
1256            .min()
1257            .map(|object_id| object_id.as_raw())
1258    }
1259}
1260
1261struct ErrState {
1262    failed_sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1263    reason: String,
1264}
1265
1266enum UploaderState {
1267    Working(UploaderData),
1268    Err(ErrState),
1269}
1270
1271/// An uploader for hummock data.
1272///
1273/// Data have 3 sequential stages: unsync (inside each local instance, data can be unsealed, sealed), syncing, synced.
1274///
1275/// The 3 stages are divided by 2 marginal epochs: `max_syncing_epoch`,
1276/// `max_synced_epoch` in each `TableUnSyncData`. Epochs satisfy the following inequality.
1277///
1278/// (epochs of `synced_data`) <= `max_synced_epoch` < (epochs of `syncing_data`) <=
1279/// `max_syncing_epoch` < (epochs of `unsync_data`)
1280///
1281/// Data are mostly stored in `VecDeque`, and the order stored in the `VecDeque` indicates the data
1282/// order. Data at the front represents ***newer*** data.
1283pub struct HummockUploader {
1284    state: UploaderState,
1285
1286    context: UploaderContext,
1287}
1288
1289impl HummockUploader {
1290    pub(super) fn new(
1291        state_store_metrics: Arc<HummockStateStoreMetrics>,
1292        pinned_version: PinnedVersion,
1293        spawn_upload_task: SpawnUploadTask,
1294        buffer_tracker: BufferTracker,
1295    ) -> Self {
1296        Self {
1297            state: UploaderState::Working(UploaderData::default()),
1298            context: UploaderContext::new(
1299                pinned_version,
1300                spawn_upload_task,
1301                buffer_tracker,
1302                state_store_metrics,
1303            ),
1304        }
1305    }
1306
1307    pub(super) fn buffer_tracker(&self) -> &BufferTracker {
1308        &self.context.buffer_tracker
1309    }
1310
1311    pub(super) fn hummock_version(&self) -> &PinnedVersion {
1312        &self.context.pinned_version
1313    }
1314
1315    pub(super) fn add_imms(&mut self, instance_id: LocalInstanceId, imms: Vec<ImmutableMemtable>) {
1316        let UploaderState::Working(data) = &mut self.state else {
1317            return;
1318        };
1319        for imm in imms {
1320            let imm = UploaderImm::new(imm, &self.context);
1321            data.unsync_data.add_imm(instance_id, imm);
1322        }
1323    }
1324
1325    pub(super) fn init_instance(
1326        &mut self,
1327        instance_id: LocalInstanceId,
1328        table_id: TableId,
1329        init_epoch: HummockEpoch,
1330    ) {
1331        let UploaderState::Working(data) = &mut self.state else {
1332            return;
1333        };
1334        data.unsync_data
1335            .init_instance(table_id, instance_id, init_epoch);
1336    }
1337
1338    pub(super) fn local_seal_epoch(
1339        &mut self,
1340        instance_id: LocalInstanceId,
1341        next_epoch: HummockEpoch,
1342        opts: SealCurrentEpochOptions,
1343    ) {
1344        let UploaderState::Working(data) = &mut self.state else {
1345            return;
1346        };
1347        data.unsync_data
1348            .local_seal_epoch(instance_id, next_epoch, opts);
1349    }
1350
1351    pub(super) fn register_vector_writer(&mut self, table_id: TableId, init_epoch: HummockEpoch) {
1352        let UploaderState::Working(data) = &mut self.state else {
1353            return;
1354        };
1355        assert!(
1356            data.unsync_vector_index_data
1357                .try_insert(
1358                    table_id,
1359                    UnsyncVectorIndexData {
1360                        sealed_epoch_data: Default::default(),
1361                        curr_epoch: init_epoch,
1362                        is_dropped: false,
1363                    }
1364                )
1365                .is_ok(),
1366            "duplicate vector writer on {}",
1367            table_id
1368        )
1369    }
1370
1371    pub(super) fn vector_writer_seal_epoch(
1372        &mut self,
1373        table_id: TableId,
1374        next_epoch: HummockEpoch,
1375        add: Option<VectorIndexAdd>,
1376    ) {
1377        let UploaderState::Working(data) = &mut self.state else {
1378            return;
1379        };
1380        let data = data
1381            .unsync_vector_index_data
1382            .get_mut(&table_id)
1383            .expect("should exist");
1384        assert!(!data.is_dropped);
1385        assert!(
1386            data.curr_epoch < next_epoch,
1387            "next epoch {} should be greater than current epoch {}",
1388            next_epoch,
1389            data.curr_epoch
1390        );
1391        data.sealed_epoch_data
1392            .try_insert(data.curr_epoch, add)
1393            .expect("non-duplicate");
1394        data.curr_epoch = next_epoch;
1395    }
1396
1397    pub(super) fn drop_vector_writer(&mut self, table_id: TableId) {
1398        let UploaderState::Working(data) = &mut self.state else {
1399            return;
1400        };
1401        let hash_map::Entry::Occupied(mut entry) = data.unsync_vector_index_data.entry(table_id)
1402        else {
1403            panic!("vector writer {} should exist", table_id);
1404        };
1405        let data = entry.get_mut();
1406        data.is_dropped = true;
1407        if data.sealed_epoch_data.is_empty() {
1408            entry.remove();
1409        }
1410    }
1411
1412    pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
1413        let UploaderState::Working(data) = &mut self.state else {
1414            return;
1415        };
1416        debug!(epoch, ?table_ids, "start epoch");
1417        for table_id in &table_ids {
1418            let table_data = data
1419                .unsync_data
1420                .table_data
1421                .entry(*table_id)
1422                .or_insert_with(|| {
1423                    TableUnsyncData::new(
1424                        *table_id,
1425                        self.context.pinned_version.table_committed_epoch(*table_id),
1426                    )
1427                });
1428            table_data.new_epoch(epoch);
1429        }
1430        if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) {
1431            assert!(
1432                data.unsync_data
1433                    .unsync_epochs
1434                    .insert(unsync_epoch_id, table_ids)
1435                    .is_none()
1436            );
1437        }
1438    }
1439
1440    pub(super) fn start_sync_epoch(
1441        &mut self,
1442        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1443        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1444    ) {
1445        let data = match &mut self.state {
1446            UploaderState::Working(data) => data,
1447            UploaderState::Err(ErrState {
1448                failed_sync_table_epochs,
1449                reason,
1450            }) => {
1451                let result = Err(HummockError::other(format!(
1452                    "previous sync epoch {:?} failed due to [{}]",
1453                    failed_sync_table_epochs, reason
1454                )));
1455                send_sync_result(sync_result_sender, result);
1456                return;
1457            }
1458        };
1459        debug!(?sync_table_epochs, "start sync epoch");
1460
1461        data.sync(&self.context, sync_result_sender, sync_table_epochs);
1462
1463        data.may_notify_sync_task(&self.context);
1464
1465        self.context
1466            .stats
1467            .uploader_syncing_epoch_count
1468            .set(data.syncing_data.len() as _);
1469    }
1470
1471    pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) {
1472        if let UploaderState::Working(data) = &mut self.state {
1473            // TODO: may only `ack_committed` on table whose `committed_epoch` is changed.
1474            for (table_id, info) in pinned_version.state_table_info.info() {
1475                if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) {
1476                    table_data.ack_committed(info.committed_epoch);
1477                }
1478            }
1479        }
1480        self.context.pinned_version = pinned_version;
1481    }
1482
1483    pub(crate) fn may_flush(&mut self) -> bool {
1484        let UploaderState::Working(data) = &mut self.state else {
1485            return false;
1486        };
1487        if self.context.buffer_tracker.need_flush() {
1488            let mut spiller = Spiller::new(&mut data.unsync_data);
1489            let mut curr_batch_flush_size = 0;
1490            // iterate from older epoch to newer epoch
1491            while self
1492                .context
1493                .buffer_tracker
1494                .need_more_flush(curr_batch_flush_size)
1495                && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload()
1496            {
1497                assert!(!payload.is_empty());
1498                {
1499                    let (task_id, task_size, spilled_table_ids) =
1500                        data.task_manager
1501                            .spill(&self.context, spilled_table_ids, payload);
1502                    for table_id in spilled_table_ids {
1503                        spiller
1504                            .unsync_data()
1505                            .table_data
1506                            .get_mut(table_id)
1507                            .expect("should exist")
1508                            .spill_tasks
1509                            .entry(epoch)
1510                            .or_default()
1511                            .push_front(task_id);
1512                    }
1513                    curr_batch_flush_size += task_size;
1514                }
1515            }
1516            data.check_upload_task_consistency();
1517            curr_batch_flush_size > 0
1518        } else {
1519            false
1520        }
1521    }
1522
1523    pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
1524        if let Some(table_ids) = table_ids {
1525            if let UploaderState::Working(data) = &mut self.state {
1526                data.clear_tables(table_ids);
1527            }
1528        } else {
1529            if let UploaderState::Working(data) = replace(
1530                &mut self.state,
1531                UploaderState::Working(UploaderData::default()),
1532            ) {
1533                data.abort(|| HummockError::other("uploader is reset"));
1534            }
1535
1536            self.context.stats.uploader_syncing_epoch_count.set(0);
1537        }
1538    }
1539
1540    pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
1541        let UploaderState::Working(data) = &mut self.state else {
1542            return;
1543        };
1544        data.unsync_data.may_destroy_instance(instance_id);
1545    }
1546
1547    pub(crate) fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1548        if let UploaderState::Working(ref u) = self.state {
1549            u.min_uncommitted_object_id()
1550        } else {
1551            None
1552        }
1553    }
1554}
1555
1556impl UploaderData {
1557    fn may_notify_sync_task(&mut self, context: &UploaderContext) {
1558        while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
1559            && syncing_data.remaining_uploading_tasks.is_empty()
1560        {
1561            let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty");
1562            let SyncingData {
1563                sync_table_epochs,
1564                remaining_uploading_tasks: _,
1565                uploaded,
1566                table_watermarks,
1567                vector_index_adds,
1568                sync_result_sender,
1569            } = syncing_data;
1570            context
1571                .stats
1572                .uploader_syncing_epoch_count
1573                .set(self.syncing_data.len() as _);
1574
1575            for (sync_epoch, table_ids) in sync_table_epochs {
1576                for table_id in table_ids {
1577                    if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) {
1578                        table_data.ack_synced(sync_epoch);
1579                        if table_data.is_empty() {
1580                            self.unsync_data.table_data.remove(&table_id);
1581                        }
1582                    }
1583                }
1584            }
1585
1586            send_sync_result(
1587                sync_result_sender,
1588                Ok(SyncedData {
1589                    uploaded_ssts: uploaded,
1590                    table_watermarks,
1591                    vector_index_adds,
1592                }),
1593            )
1594        }
1595    }
1596
1597    fn check_upload_task_consistency(&self) {
1598        #[cfg(debug_assertions)]
1599        {
1600            let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new();
1601            for table_data in self.unsync_data.table_data.values() {
1602                for task_id in table_data
1603                    .spill_tasks
1604                    .iter()
1605                    .flat_map(|(_, tasks)| tasks.iter())
1606                {
1607                    assert!(
1608                        spill_task_table_id_from_data
1609                            .entry(*task_id)
1610                            .or_default()
1611                            .insert(table_data.table_id)
1612                    );
1613                }
1614            }
1615            let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self
1616                .syncing_data
1617                .iter()
1618                .filter_map(|(sync_id, data)| {
1619                    if data.remaining_uploading_tasks.is_empty() {
1620                        None
1621                    } else {
1622                        Some((*sync_id, data.remaining_uploading_tasks.clone()))
1623                    }
1624                })
1625                .collect();
1626
1627            let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1628            for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
1629                spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
1630            }
1631            let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1632            for (task_id, status) in self.task_manager.tasks() {
1633                match status {
1634                    UploadingTaskStatus::Spilling(table_ids) => {
1635                        assert!(
1636                            spill_task_table_id_from_manager
1637                                .insert(task_id, table_ids.clone())
1638                                .is_none()
1639                        );
1640                    }
1641                    UploadingTaskStatus::Sync(sync_id) => {
1642                        assert!(
1643                            syncing_task_from_manager
1644                                .entry(*sync_id)
1645                                .or_default()
1646                                .insert(task_id)
1647                        );
1648                    }
1649                }
1650            }
1651            assert_eq!(
1652                spill_task_table_id_from_data,
1653                spill_task_table_id_from_manager
1654            );
1655            assert_eq!(syncing_task_id_from_data, syncing_task_from_manager);
1656        }
1657    }
1658}
1659
1660impl HummockUploader {
1661    pub(super) fn next_uploaded_sst(
1662        &mut self,
1663    ) -> impl Future<Output = Arc<StagingSstableInfo>> + '_ {
1664        poll_fn(|cx| {
1665            let UploaderState::Working(data) = &mut self.state else {
1666                return Poll::Pending;
1667            };
1668
1669            if let Some((task_id, status, result)) = ready!(data.task_manager.poll_task_result(cx))
1670            {
1671                match result {
1672                    Ok(sst) => {
1673                        data.unsync_data.ack_flushed(&sst);
1674                        match status {
1675                            UploadingTaskStatus::Sync(sync_id) => {
1676                                let syncing_data =
1677                                    data.syncing_data.get_mut(&sync_id).expect("should exist");
1678                                syncing_data.uploaded.push_front(sst.clone());
1679                                assert!(syncing_data.remaining_uploading_tasks.remove(&task_id));
1680                                data.may_notify_sync_task(&self.context);
1681                            }
1682                            UploadingTaskStatus::Spilling(table_ids) => {
1683                                data.unsync_data
1684                                    .spilled_data
1685                                    .insert(task_id, (sst.clone(), table_ids));
1686                            }
1687                        }
1688                        data.check_upload_task_consistency();
1689                        Poll::Ready(sst)
1690                    }
1691                    Err((sync_id, e)) => {
1692                        let syncing_data =
1693                            data.syncing_data.remove(&sync_id).expect("should exist");
1694                        let failed_epochs = syncing_data.sync_table_epochs.clone();
1695                        let data = must_match!(replace(
1696                            &mut self.state,
1697                            UploaderState::Err(ErrState {
1698                                failed_sync_table_epochs: syncing_data.sync_table_epochs,
1699                                reason: e.as_report().to_string(),
1700                            }),
1701                        ), UploaderState::Working(data) => data);
1702
1703                        let _ = syncing_data
1704                            .sync_result_sender
1705                            .send(Err(HummockError::other(format!(
1706                                "failed to sync: {:?}",
1707                                e.as_report()
1708                            ))));
1709
1710                        data.abort(|| {
1711                            HummockError::other(format!(
1712                                "previous epoch {:?} failed to sync",
1713                                failed_epochs
1714                            ))
1715                        });
1716                        Poll::Pending
1717                    }
1718                }
1719            } else {
1720                Poll::Pending
1721            }
1722        })
1723    }
1724}
1725
1726#[cfg(test)]
1727pub(crate) mod tests {
1728    use std::collections::{HashMap, HashSet};
1729    use std::future::{Future, poll_fn};
1730    use std::ops::Deref;
1731    use std::pin::pin;
1732    use std::sync::Arc;
1733    use std::sync::atomic::AtomicUsize;
1734    use std::sync::atomic::Ordering::SeqCst;
1735    use std::task::Poll;
1736
1737    use futures::FutureExt;
1738    use risingwave_common::catalog::TableId;
1739    use risingwave_common::util::epoch::EpochExt;
1740    use risingwave_hummock_sdk::HummockEpoch;
1741    use risingwave_hummock_sdk::vector_index::{
1742        FlatIndexAdd, VectorFileInfo, VectorIndexAdd, VectorStoreInfoDelta,
1743    };
1744    use tokio::sync::oneshot;
1745
1746    use super::test_utils::*;
1747    use crate::hummock::event_handler::uploader::{
1748        HummockUploader, SyncedData, UploadingTask, get_payload_imm_ids,
1749    };
1750    use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID};
1751    use crate::hummock::{HummockError, HummockResult};
1752    use crate::mem_table::ImmutableMemtable;
1753    use crate::opts::StorageOpts;
1754
1755    impl HummockUploader {
1756        pub(super) fn add_imm(&mut self, instance_id: LocalInstanceId, imm: ImmutableMemtable) {
1757            self.add_imms(instance_id, vec![imm]);
1758        }
1759
1760        pub(super) fn start_single_epoch_sync(
1761            &mut self,
1762            epoch: HummockEpoch,
1763            sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1764            table_ids: HashSet<TableId>,
1765        ) {
1766            self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]);
1767        }
1768    }
1769
1770    #[tokio::test]
1771    pub async fn test_uploading_task_future() {
1772        let uploader_context = test_uploader_context(dummy_success_upload_future);
1773
1774        let imm = gen_imm(INITIAL_EPOCH).await;
1775        let imm_size = imm.size();
1776        let imm_ids = get_imm_ids(vec![&imm]);
1777        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1778        assert_eq!(imm_size, task.task_info.task_size);
1779        assert_eq!(imm_ids, task.task_info.imm_ids);
1780        assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs);
1781        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1782        assert_eq!(
1783            output.sstable_infos(),
1784            &dummy_success_upload_output().new_value_ssts
1785        );
1786        assert_eq!(imm_size, output.imm_size());
1787        assert_eq!(&imm_ids, output.imm_ids());
1788        assert_eq!(&vec![INITIAL_EPOCH], output.epochs());
1789
1790        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1791        let imm = gen_imm(INITIAL_EPOCH).await;
1792        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1793        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1794    }
1795
1796    #[tokio::test]
1797    pub async fn test_uploading_task_poll_result() {
1798        let uploader_context = test_uploader_context(dummy_success_upload_future);
1799        let mut task =
1800            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1801        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1802        assert_eq!(
1803            output.sstable_infos(),
1804            &dummy_success_upload_output().new_value_ssts
1805        );
1806
1807        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1808        let mut task =
1809            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1810        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1811    }
1812
1813    #[tokio::test]
1814    async fn test_uploading_task_poll_ok_with_retry() {
1815        let run_count = Arc::new(AtomicUsize::new(0));
1816        let fail_num = 10;
1817        let run_count_clone = run_count.clone();
1818        let uploader_context = test_uploader_context(move |payload, info| {
1819            let run_count = run_count.clone();
1820            async move {
1821                // fail in the first `fail_num` run, and success at the end
1822                let ret = if run_count.load(SeqCst) < fail_num {
1823                    Err(HummockError::other("fail"))
1824                } else {
1825                    dummy_success_upload_future(payload, info).await
1826                };
1827                run_count.fetch_add(1, SeqCst);
1828                ret
1829            }
1830        });
1831        let mut task =
1832            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1833        let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await;
1834        assert_eq!(fail_num + 1, run_count_clone.load(SeqCst));
1835        assert_eq!(
1836            output.sstable_infos(),
1837            &dummy_success_upload_output().new_value_ssts
1838        );
1839    }
1840
1841    #[tokio::test]
1842    async fn test_uploader_basic() {
1843        let mut uploader = test_uploader(dummy_success_upload_future);
1844        let epoch1 = INITIAL_EPOCH.next_epoch();
1845        const VECTOR_INDEX_TABLE_ID: TableId = TableId::new(234);
1846        uploader.start_epoch(
1847            epoch1,
1848            HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1849        );
1850        let imm = gen_imm(epoch1).await;
1851        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1852        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1853        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1854
1855        uploader.register_vector_writer(VECTOR_INDEX_TABLE_ID, epoch1);
1856        let vector_info_file = VectorFileInfo {
1857            object_id: 1.into(),
1858            vector_count: 1,
1859            file_size: 0,
1860            start_vector_id: 0,
1861            meta_offset: 20,
1862        };
1863        let vector_index_add = VectorIndexAdd::Flat(FlatIndexAdd {
1864            vector_store_info_delta: VectorStoreInfoDelta {
1865                next_vector_id: 1,
1866                added_vector_files: vec![vector_info_file.clone()],
1867            },
1868        });
1869        uploader.vector_writer_seal_epoch(
1870            VECTOR_INDEX_TABLE_ID,
1871            epoch1.next_epoch(),
1872            Some(vector_index_add.clone()),
1873        );
1874
1875        let (sync_tx, sync_rx) = oneshot::channel();
1876        uploader.start_single_epoch_sync(
1877            epoch1,
1878            sync_tx,
1879            HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1880        );
1881        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1882        assert_eq!(1, uploader.data().syncing_data.len());
1883        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1884        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1885        assert!(syncing_data.uploaded.is_empty());
1886        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1887
1888        let staging_sst = uploader.next_uploaded_sst().await;
1889        assert_eq!(&vec![epoch1], staging_sst.epochs());
1890        assert_eq!(
1891            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1892            staging_sst.imm_ids()
1893        );
1894        assert_eq!(
1895            &dummy_success_upload_output().new_value_ssts,
1896            staging_sst.sstable_infos()
1897        );
1898
1899        match sync_rx.await {
1900            Ok(Ok(data)) => {
1901                let SyncedData {
1902                    uploaded_ssts,
1903                    table_watermarks,
1904                    vector_index_adds,
1905                } = data;
1906                assert_eq!(1, uploaded_ssts.len());
1907                let staging_sst = &uploaded_ssts[0];
1908                assert_eq!(&vec![epoch1], staging_sst.epochs());
1909                assert_eq!(
1910                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1911                    staging_sst.imm_ids()
1912                );
1913                assert_eq!(
1914                    &dummy_success_upload_output().new_value_ssts,
1915                    staging_sst.sstable_infos()
1916                );
1917                assert!(table_watermarks.is_empty());
1918                assert_eq!(vector_index_adds.len(), 1);
1919                let (table_id, vector_index_adds) = vector_index_adds.into_iter().next().unwrap();
1920                assert_eq!(table_id, VECTOR_INDEX_TABLE_ID);
1921                assert_eq!(vector_index_adds.len(), 1);
1922                let synced_vector_index_add = vector_index_adds[0].clone();
1923                assert_eq!(vector_index_add, synced_vector_index_add);
1924            }
1925            _ => unreachable!(),
1926        };
1927        assert_eq!(epoch1, uploader.test_max_synced_epoch());
1928
1929        let new_pinned_version = uploader
1930            .context
1931            .pinned_version
1932            .new_pin_version(test_hummock_version(epoch1))
1933            .unwrap();
1934        uploader.update_pinned_version(new_pinned_version);
1935        assert_eq!(
1936            epoch1,
1937            uploader
1938                .context
1939                .pinned_version
1940                .table_committed_epoch(TEST_TABLE_ID)
1941                .unwrap()
1942        );
1943    }
1944
1945    #[tokio::test]
1946    async fn test_uploader_destroy_instance_before_sync() {
1947        let mut uploader = test_uploader(dummy_success_upload_future);
1948        let epoch1 = INITIAL_EPOCH.next_epoch();
1949        uploader.start_epochs_for_test([epoch1]);
1950        let imm = gen_imm(epoch1).await;
1951        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1952        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1953        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1954        uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);
1955
1956        let (sync_tx, sync_rx) = oneshot::channel();
1957        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1958        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1959        assert_eq!(1, uploader.data().syncing_data.len());
1960        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1961        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1962        assert!(syncing_data.uploaded.is_empty());
1963        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1964
1965        let staging_sst = uploader.next_uploaded_sst().await;
1966        assert_eq!(&vec![epoch1], staging_sst.epochs());
1967        assert_eq!(
1968            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1969            staging_sst.imm_ids()
1970        );
1971        assert_eq!(
1972            &dummy_success_upload_output().new_value_ssts,
1973            staging_sst.sstable_infos()
1974        );
1975
1976        match sync_rx.await {
1977            Ok(Ok(data)) => {
1978                let SyncedData {
1979                    uploaded_ssts,
1980                    table_watermarks,
1981                    ..
1982                } = data;
1983                assert_eq!(1, uploaded_ssts.len());
1984                let staging_sst = &uploaded_ssts[0];
1985                assert_eq!(&vec![epoch1], staging_sst.epochs());
1986                assert_eq!(
1987                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1988                    staging_sst.imm_ids()
1989                );
1990                assert_eq!(
1991                    &dummy_success_upload_output().new_value_ssts,
1992                    staging_sst.sstable_infos()
1993                );
1994                assert!(table_watermarks.is_empty());
1995            }
1996            _ => unreachable!(),
1997        };
1998        assert!(
1999            !uploader
2000                .data()
2001                .unsync_data
2002                .table_data
2003                .contains_key(&TEST_TABLE_ID)
2004        );
2005    }
2006
2007    #[tokio::test]
2008    async fn test_empty_uploader_sync() {
2009        let mut uploader = test_uploader(dummy_success_upload_future);
2010        let epoch1 = INITIAL_EPOCH.next_epoch();
2011
2012        let (sync_tx, sync_rx) = oneshot::channel();
2013        uploader.start_epochs_for_test([epoch1]);
2014        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2015        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2016        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2017        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2018
2019        assert_uploader_pending(&mut uploader).await;
2020
2021        match sync_rx.await {
2022            Ok(Ok(data)) => {
2023                assert!(data.uploaded_ssts.is_empty());
2024            }
2025            _ => unreachable!(),
2026        };
2027        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2028        let new_pinned_version = uploader
2029            .context
2030            .pinned_version
2031            .new_pin_version(test_hummock_version(epoch1))
2032            .unwrap();
2033        uploader.update_pinned_version(new_pinned_version);
2034        assert!(uploader.data().syncing_data.is_empty());
2035        assert_eq!(
2036            epoch1,
2037            uploader
2038                .context
2039                .pinned_version
2040                .table_committed_epoch(TEST_TABLE_ID)
2041                .unwrap()
2042        );
2043    }
2044
2045    #[tokio::test]
2046    async fn test_uploader_empty_epoch() {
2047        let mut uploader = test_uploader(dummy_success_upload_future);
2048        let epoch1 = INITIAL_EPOCH.next_epoch();
2049        let epoch2 = epoch1.next_epoch();
2050        uploader.start_epochs_for_test([epoch1, epoch2]);
2051        let imm = gen_imm(epoch2).await;
2052        // epoch1 is empty while epoch2 is not. Going to seal empty epoch1.
2053        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2054        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2055        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm);
2056
2057        let (sync_tx, sync_rx) = oneshot::channel();
2058        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2059        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2060
2061        assert_uploader_pending(&mut uploader).await;
2062
2063        match sync_rx.await {
2064            Ok(Ok(data)) => {
2065                assert!(data.uploaded_ssts.is_empty());
2066            }
2067            _ => unreachable!(),
2068        };
2069        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2070        let new_pinned_version = uploader
2071            .context
2072            .pinned_version
2073            .new_pin_version(test_hummock_version(epoch1))
2074            .unwrap();
2075        uploader.update_pinned_version(new_pinned_version);
2076        assert!(uploader.data().syncing_data.is_empty());
2077        assert_eq!(
2078            epoch1,
2079            uploader
2080                .context
2081                .pinned_version
2082                .table_committed_epoch(TEST_TABLE_ID)
2083                .unwrap()
2084        );
2085    }
2086
2087    #[tokio::test]
2088    async fn test_uploader_poll_empty() {
2089        let mut uploader = test_uploader(dummy_success_upload_future);
2090        let fut = uploader.next_uploaded_sst();
2091        let mut fut = pin!(fut);
2092        assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await);
2093    }
2094
2095    #[tokio::test]
2096    async fn test_uploader_empty_advance_mce() {
2097        let mut uploader = test_uploader(dummy_success_upload_future);
2098        let initial_pinned_version = uploader.context.pinned_version.clone();
2099        let epoch1 = INITIAL_EPOCH.next_epoch();
2100        let epoch2 = epoch1.next_epoch();
2101        let epoch3 = epoch2.next_epoch();
2102        let epoch4 = epoch3.next_epoch();
2103        let epoch5 = epoch4.next_epoch();
2104        let epoch6 = epoch5.next_epoch();
2105        let version1 = initial_pinned_version
2106            .new_pin_version(test_hummock_version(epoch1))
2107            .unwrap();
2108        let version2 = initial_pinned_version
2109            .new_pin_version(test_hummock_version(epoch2))
2110            .unwrap();
2111        let version3 = initial_pinned_version
2112            .new_pin_version(test_hummock_version(epoch3))
2113            .unwrap();
2114        let version4 = initial_pinned_version
2115            .new_pin_version(test_hummock_version(epoch4))
2116            .unwrap();
2117        let version5 = initial_pinned_version
2118            .new_pin_version(test_hummock_version(epoch5))
2119            .unwrap();
2120
2121        uploader.start_epochs_for_test([epoch6]);
2122        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
2123
2124        uploader.update_pinned_version(version1);
2125        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2126        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2127
2128        let imm = gen_imm(epoch6).await;
2129        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
2130        uploader.update_pinned_version(version2);
2131        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2132        assert_eq!(epoch2, uploader.test_max_syncing_epoch());
2133
2134        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6);
2135        uploader.update_pinned_version(version3);
2136        assert_eq!(epoch3, uploader.test_max_synced_epoch());
2137        assert_eq!(epoch3, uploader.test_max_syncing_epoch());
2138
2139        let (sync_tx, sync_rx) = oneshot::channel();
2140        uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2141        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2142        uploader.update_pinned_version(version4);
2143        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2144        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2145
2146        let sst = uploader.next_uploaded_sst().await;
2147        assert_eq!(&get_imm_ids([&imm]), sst.imm_ids());
2148
2149        match sync_rx.await {
2150            Ok(Ok(data)) => {
2151                assert!(data.table_watermarks.is_empty());
2152                assert_eq!(1, data.uploaded_ssts.len());
2153                assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids());
2154            }
2155            _ => unreachable!(),
2156        }
2157
2158        uploader.update_pinned_version(version5);
2159        assert_eq!(epoch6, uploader.test_max_synced_epoch());
2160        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2161    }
2162
2163    #[tokio::test]
2164    async fn test_uploader_finish_in_order() {
2165        let config = StorageOpts {
2166            shared_buffer_capacity_mb: 1024 * 1024,
2167            shared_buffer_flush_ratio: 0.0,
2168            ..Default::default()
2169        };
2170        let (buffer_tracker, mut uploader, new_task_notifier) =
2171            prepare_uploader_order_test(&config, false);
2172
2173        let epoch1 = INITIAL_EPOCH.next_epoch();
2174        let epoch2 = epoch1.next_epoch();
2175        let epoch3 = epoch2.next_epoch();
2176        let epoch4 = epoch3.next_epoch();
2177        uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]);
2178        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2179        let memory_limiter = Some(memory_limiter.deref());
2180
2181        let instance_id1 = 1;
2182        let instance_id2 = 2;
2183
2184        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2185        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2186
2187        // imm2 contains data in newer epoch, but added first
2188        let imm2 = gen_imm_with_limiter(epoch2, memory_limiter).await;
2189        uploader.add_imm(instance_id2, imm2.clone());
2190        let imm1_1 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2191        uploader.add_imm(instance_id1, imm1_1.clone());
2192        let imm1_2 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2193        uploader.add_imm(instance_id1, imm1_2.clone());
2194
2195        // imm1 will be spilled first
2196        let epoch1_spill_payload12 =
2197            HashMap::from_iter([(instance_id1, vec![imm1_2.clone(), imm1_1.clone()])]);
2198        let epoch2_spill_payload = HashMap::from_iter([(instance_id2, vec![imm2.clone()])]);
2199        let (await_start1, finish_tx1) =
2200            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload12));
2201        let (await_start2, finish_tx2) =
2202            new_task_notifier(get_payload_imm_ids(&epoch2_spill_payload));
2203        uploader.may_flush();
2204        await_start1.await;
2205        await_start2.await;
2206
2207        assert_uploader_pending(&mut uploader).await;
2208
2209        finish_tx2.send(()).unwrap();
2210        assert_uploader_pending(&mut uploader).await;
2211
2212        finish_tx1.send(()).unwrap();
2213        let sst = uploader.next_uploaded_sst().await;
2214        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids());
2215        assert_eq!(&vec![epoch1], sst.epochs());
2216
2217        let sst = uploader.next_uploaded_sst().await;
2218        assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids());
2219        assert_eq!(&vec![epoch2], sst.epochs());
2220
2221        let imm1_3 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2222        uploader.add_imm(instance_id1, imm1_3.clone());
2223        let epoch1_spill_payload3 = HashMap::from_iter([(instance_id1, vec![imm1_3.clone()])]);
2224        let (await_start1_3, finish_tx1_3) =
2225            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload3));
2226        uploader.may_flush();
2227        await_start1_3.await;
2228        let imm1_4 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2229        uploader.add_imm(instance_id1, imm1_4.clone());
2230        let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]);
2231        let (await_start1_4, finish_tx1_4) =
2232            new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload));
2233        uploader.local_seal_epoch_for_test(instance_id1, epoch1);
2234        let (sync_tx1, mut sync_rx1) = oneshot::channel();
2235        uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID]));
2236        await_start1_4.await;
2237
2238        uploader.local_seal_epoch_for_test(instance_id1, epoch2);
2239        uploader.local_seal_epoch_for_test(instance_id2, epoch2);
2240
2241        // current uploader state:
2242        // unsealed: empty
2243        // sealed: epoch2: uploaded sst([imm2])
2244        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2245
2246        let imm3_1 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2247        let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]);
2248        uploader.add_imm(instance_id1, imm3_1.clone());
2249        let (await_start3_1, finish_tx3_1) =
2250            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload1));
2251        uploader.may_flush();
2252        await_start3_1.await;
2253        let imm3_2 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2254        let epoch3_spill_payload2 = HashMap::from_iter([(instance_id2, vec![imm3_2.clone()])]);
2255        uploader.add_imm(instance_id2, imm3_2.clone());
2256        let (await_start3_2, finish_tx3_2) =
2257            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload2));
2258        uploader.may_flush();
2259        await_start3_2.await;
2260        let imm3_3 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2261        uploader.add_imm(instance_id1, imm3_3.clone());
2262
2263        // current uploader state:
2264        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2265        // sealed: uploaded sst([imm2])
2266        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2267
2268        uploader.local_seal_epoch_for_test(instance_id1, epoch3);
2269        let imm4 = gen_imm_with_limiter(epoch4, memory_limiter).await;
2270        uploader.add_imm(instance_id1, imm4.clone());
2271        assert_uploader_pending(&mut uploader).await;
2272
2273        // current uploader state:
2274        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2275        //           epoch4: imm: imm4
2276        // sealed: uploaded sst([imm2])
2277        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2278
2279        finish_tx3_1.send(()).unwrap();
2280        assert_uploader_pending(&mut uploader).await;
2281        finish_tx1_4.send(()).unwrap();
2282        assert_uploader_pending(&mut uploader).await;
2283        finish_tx1_3.send(()).unwrap();
2284
2285        let sst = uploader.next_uploaded_sst().await;
2286        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids());
2287
2288        assert!(poll_fn(|cx| Poll::Ready(sync_rx1.poll_unpin(cx).is_pending())).await);
2289
2290        let sst = uploader.next_uploaded_sst().await;
2291        assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids());
2292
2293        match sync_rx1.await {
2294            Ok(Ok(data)) => {
2295                assert_eq!(3, data.uploaded_ssts.len());
2296                assert_eq!(
2297                    &get_payload_imm_ids(&epoch1_sync_payload),
2298                    data.uploaded_ssts[0].imm_ids()
2299                );
2300                assert_eq!(
2301                    &get_payload_imm_ids(&epoch1_spill_payload3),
2302                    data.uploaded_ssts[1].imm_ids()
2303                );
2304                assert_eq!(
2305                    &get_payload_imm_ids(&epoch1_spill_payload12),
2306                    data.uploaded_ssts[2].imm_ids()
2307                );
2308            }
2309            _ => {
2310                unreachable!()
2311            }
2312        }
2313
2314        // current uploader state:
2315        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2316        //           epoch4: imm: imm4
2317        // sealed: uploaded sst([imm2])
2318        // syncing: empty
2319        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2320
2321        let (sync_tx2, sync_rx2) = oneshot::channel();
2322        uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID]));
2323        uploader.local_seal_epoch_for_test(instance_id2, epoch3);
2324        let sst = uploader.next_uploaded_sst().await;
2325        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids());
2326
2327        match sync_rx2.await {
2328            Ok(Ok(data)) => {
2329                assert_eq!(data.uploaded_ssts.len(), 1);
2330                assert_eq!(
2331                    &get_payload_imm_ids(&epoch2_spill_payload),
2332                    data.uploaded_ssts[0].imm_ids()
2333                );
2334            }
2335            _ => {
2336                unreachable!("should be sync finish");
2337            }
2338        }
2339        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2340
2341        // current uploader state:
2342        // unsealed: epoch4: imm: imm4
2343        // sealed: imm: imm3_3, uploading: [imm3_2], uploaded: sst([imm3_1])
2344        // syncing: empty
2345        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2346        //         epoch2: sst([imm2])
2347
2348        uploader.local_seal_epoch_for_test(instance_id1, epoch4);
2349        uploader.local_seal_epoch_for_test(instance_id2, epoch4);
2350        let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]);
2351        let (await_start4_with_3_3, finish_tx4_with_3_3) =
2352            new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload));
2353        let (sync_tx4, mut sync_rx4) = oneshot::channel();
2354        uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID]));
2355        await_start4_with_3_3.await;
2356
2357        // current uploader state:
2358        // unsealed: empty
2359        // sealed: empty
2360        // syncing: epoch4: uploading: [imm4, imm3_3], [imm3_2], uploaded: sst([imm3_1])
2361        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2362        //         epoch2: sst([imm2])
2363
2364        assert_uploader_pending(&mut uploader).await;
2365
2366        finish_tx3_2.send(()).unwrap();
2367        let sst = uploader.next_uploaded_sst().await;
2368        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids());
2369
2370        finish_tx4_with_3_3.send(()).unwrap();
2371        assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await);
2372
2373        let sst = uploader.next_uploaded_sst().await;
2374        assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids());
2375
2376        match sync_rx4.await {
2377            Ok(Ok(data)) => {
2378                assert_eq!(3, data.uploaded_ssts.len());
2379                assert_eq!(
2380                    &get_payload_imm_ids(&epoch4_sync_payload),
2381                    data.uploaded_ssts[0].imm_ids()
2382                );
2383                assert_eq!(
2384                    &get_payload_imm_ids(&epoch3_spill_payload2),
2385                    data.uploaded_ssts[1].imm_ids()
2386                );
2387                assert_eq!(
2388                    &get_payload_imm_ids(&epoch3_spill_payload1),
2389                    data.uploaded_ssts[2].imm_ids(),
2390                )
2391            }
2392            _ => {
2393                unreachable!("should be sync finish");
2394            }
2395        }
2396        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2397
2398        // current uploader state:
2399        // unsealed: empty
2400        // sealed: empty
2401        // syncing: empty
2402        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2403        //         epoch2: sst([imm2])
2404        //         epoch4: sst([imm4, imm3_3]), sst([imm3_2]), sst([imm3_1])
2405    }
2406
2407    #[tokio::test]
2408    async fn test_uploader_frequently_flush() {
2409        let config = StorageOpts {
2410            shared_buffer_capacity_mb: 10,
2411            shared_buffer_flush_ratio: 0.12,
2412            // This test will fail when we set it to 0
2413            shared_buffer_min_batch_flush_size_mb: 1,
2414            ..Default::default()
2415        };
2416        let (buffer_tracker, mut uploader, _new_task_notifier) =
2417            prepare_uploader_order_test(&config, true);
2418
2419        let epoch1 = INITIAL_EPOCH.next_epoch();
2420        let epoch2 = epoch1.next_epoch();
2421        uploader.start_epochs_for_test([epoch1, epoch2]);
2422        let instance_id1 = 1;
2423        let instance_id2 = 2;
2424        let flush_threshold = buffer_tracker.flush_threshold();
2425        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2426
2427        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2428        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2429
2430        // imm2 contains data in newer epoch, but added first
2431        let mut total_memory = 0;
2432        while total_memory < flush_threshold {
2433            let imm = gen_imm_with_limiter(epoch2, Some(memory_limiter.as_ref())).await;
2434            total_memory += imm.size();
2435            if total_memory > flush_threshold {
2436                break;
2437            }
2438            uploader.add_imm(instance_id2, imm);
2439        }
2440        let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2441        uploader.add_imm(instance_id1, imm);
2442        assert!(uploader.may_flush());
2443
2444        for _ in 0..10 {
2445            let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2446            uploader.add_imm(instance_id1, imm);
2447            assert!(!uploader.may_flush());
2448        }
2449    }
2450}