risingwave_storage/hummock/event_handler/uploader/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod spiller;
16mod task_manager;
17pub(crate) mod test_utils;
18
19use std::cmp::Ordering;
20use std::collections::btree_map::Entry;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque, hash_map};
22use std::fmt::{Debug, Display, Formatter};
23use std::future::{Future, poll_fn};
24use std::mem::{replace, swap, take};
25use std::sync::Arc;
26use std::task::{Context, Poll, ready};
27
28use futures::FutureExt;
29use itertools::Itertools;
30use more_asserts::assert_gt;
31use prometheus::{HistogramTimer, IntGauge};
32use risingwave_common::bitmap::BitmapBuilder;
33use risingwave_common::catalog::TableId;
34use risingwave_common::metrics::UintGauge;
35use risingwave_common::must_match;
36use risingwave_hummock_sdk::table_watermark::{
37    TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
40use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId, LocalSstableInfo};
41use task_manager::{TaskManager, UploadingTaskStatus};
42use thiserror_ext::AsReport;
43use tokio::sync::oneshot;
44use tokio::task::JoinHandle;
45use tracing::{debug, error, info, warn};
46
47use crate::hummock::event_handler::LocalInstanceId;
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, send_sync_result};
49use crate::hummock::event_handler::uploader::spiller::Spiller;
50use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm;
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchId;
53use crate::hummock::store::version::StagingSstableInfo;
54use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
55use crate::mem_table::ImmId;
56use crate::monitor::HummockStateStoreMetrics;
57use crate::store::SealCurrentEpochOptions;
58
59/// Take epoch data inclusively before `epoch` out from `data`
60fn take_before_epoch<T>(
61    data: &mut BTreeMap<HummockEpoch, T>,
62    epoch: HummockEpoch,
63) -> BTreeMap<HummockEpoch, T> {
64    let mut before_epoch_data = data.split_off(&(epoch + 1));
65    swap(&mut before_epoch_data, data);
66    before_epoch_data
67}
68
69type UploadTaskInput = HashMap<LocalInstanceId, Vec<UploaderImm>>;
70pub type UploadTaskPayload = HashMap<LocalInstanceId, Vec<ImmutableMemtable>>;
71
72#[derive(Debug)]
73pub struct UploadTaskOutput {
74    pub new_value_ssts: Vec<LocalSstableInfo>,
75    pub old_value_ssts: Vec<LocalSstableInfo>,
76    pub wait_poll_timer: Option<HistogramTimer>,
77}
78pub type SpawnUploadTask = Arc<
79    dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle<HummockResult<UploadTaskOutput>>
80        + Send
81        + Sync
82        + 'static,
83>;
84
85#[derive(Clone)]
86pub struct UploadTaskInfo {
87    pub task_size: usize,
88    pub epochs: Vec<HummockEpoch>,
89    pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
90}
91
92impl Display for UploadTaskInfo {
93    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94        f.debug_struct("UploadTaskInfo")
95            .field("task_size", &self.task_size)
96            .field("epochs", &self.epochs)
97            .field("len(imm_ids)", &self.imm_ids.len())
98            .finish()
99    }
100}
101
102impl Debug for UploadTaskInfo {
103    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("UploadTaskInfo")
105            .field("task_size", &self.task_size)
106            .field("epochs", &self.epochs)
107            .field("imm_ids", &self.imm_ids)
108            .finish()
109    }
110}
111
112mod uploader_imm {
113    use std::fmt::Formatter;
114    use std::ops::Deref;
115
116    use risingwave_common::metrics::UintGauge;
117
118    use crate::hummock::event_handler::uploader::UploaderContext;
119    use crate::mem_table::ImmutableMemtable;
120
121    pub(super) struct UploaderImm {
122        inner: ImmutableMemtable,
123        size_guard: UintGauge,
124    }
125
126    impl UploaderImm {
127        pub(super) fn new(imm: ImmutableMemtable, context: &UploaderContext) -> Self {
128            let size = imm.size();
129            let size_guard = context.stats.uploader_imm_size.clone();
130            size_guard.add(size as _);
131            Self {
132                inner: imm,
133                size_guard,
134            }
135        }
136
137        #[cfg(test)]
138        pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
139            Self {
140                inner: imm,
141                size_guard: UintGauge::new("test", "test").unwrap(),
142            }
143        }
144    }
145
146    impl std::fmt::Debug for UploaderImm {
147        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
148            self.inner.fmt(f)
149        }
150    }
151
152    impl Deref for UploaderImm {
153        type Target = ImmutableMemtable;
154
155        fn deref(&self) -> &Self::Target {
156            &self.inner
157        }
158    }
159
160    impl Drop for UploaderImm {
161        fn drop(&mut self) {
162            self.size_guard.sub(self.inner.size() as _);
163        }
164    }
165}
166
167#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
168struct UploadingTaskId(usize);
169
170/// A wrapper for a uploading task that compacts and uploads the imm payload. Task context are
171/// stored so that when the task fails, it can be re-tried.
172struct UploadingTask {
173    task_id: UploadingTaskId,
174    // newer data at the front
175    input: UploadTaskInput,
176    join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
177    task_info: UploadTaskInfo,
178    spawn_upload_task: SpawnUploadTask,
179    task_size_guard: UintGauge,
180    task_count_guard: IntGauge,
181}
182
183impl Drop for UploadingTask {
184    fn drop(&mut self) {
185        self.task_size_guard.sub(self.task_info.task_size as u64);
186        self.task_count_guard.dec();
187    }
188}
189
190impl Debug for UploadingTask {
191    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
192        f.debug_struct("UploadingTask")
193            .field("input", &self.input)
194            .field("task_info", &self.task_info)
195            .finish()
196    }
197}
198
199fn get_payload_imm_ids(
200    payload: &UploadTaskPayload,
201) -> HashMap<LocalInstanceId, Vec<SharedBufferBatchId>> {
202    payload
203        .iter()
204        .map(|(instance_id, imms)| {
205            (
206                *instance_id,
207                imms.iter().map(|imm| imm.batch_id()).collect_vec(),
208            )
209        })
210        .collect()
211}
212
213impl UploadingTask {
214    // INFO logs will be enabled for task with size exceeding 50MB.
215    const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);
216
217    fn input_to_payload(input: &UploadTaskInput) -> UploadTaskPayload {
218        input
219            .iter()
220            .map(|(instance_id, imms)| {
221                (
222                    *instance_id,
223                    imms.iter().map(|imm| (**imm).clone()).collect(),
224                )
225            })
226            .collect()
227    }
228
229    fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
230        assert!(!input.is_empty());
231        let mut epochs = input
232            .iter()
233            .flat_map(|(_, imms)| imms.iter().flat_map(|imm| imm.epochs().iter().cloned()))
234            .sorted()
235            .dedup()
236            .collect_vec();
237
238        // reverse to make newer epochs comes first
239        epochs.reverse();
240        let payload = Self::input_to_payload(&input);
241        let imm_ids = get_payload_imm_ids(&payload);
242        let task_size = input
243            .values()
244            .map(|imms| imms.iter().map(|imm| imm.size()).sum::<usize>())
245            .sum();
246        let task_info = UploadTaskInfo {
247            task_size,
248            epochs,
249            imm_ids,
250        };
251        context
252            .buffer_tracker
253            .global_upload_task_size()
254            .add(task_size as u64);
255        if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
256            info!("start upload task: {:?}", task_info);
257        } else {
258            debug!("start upload task: {:?}", task_info);
259        }
260        let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
261        context.stats.uploader_uploading_task_count.inc();
262        Self {
263            task_id,
264            input,
265            join_handle,
266            task_info,
267            spawn_upload_task: context.spawn_upload_task.clone(),
268            task_size_guard: context.buffer_tracker.global_upload_task_size().clone(),
269            task_count_guard: context.stats.uploader_uploading_task_count.clone(),
270        }
271    }
272
273    /// Poll the result of the uploading task
274    fn poll_result(
275        &mut self,
276        cx: &mut Context<'_>,
277    ) -> Poll<HummockResult<Arc<StagingSstableInfo>>> {
278        Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
279            Ok(task_result) => task_result
280                .inspect(|_| {
281                    if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
282                        info!(task_info = ?self.task_info, "upload task finish");
283                    } else {
284                        debug!(task_info = ?self.task_info, "upload task finish");
285                    }
286                })
287                .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
288                .map(|output| {
289                    Arc::new(StagingSstableInfo::new(
290                        output.new_value_ssts,
291                        output.old_value_ssts,
292                        self.task_info.epochs.clone(),
293                        self.task_info.imm_ids.clone(),
294                        self.task_info.task_size,
295                    ))
296                }),
297
298            Err(err) => Err(HummockError::other(format!(
299                "fail to join upload join handle: {}",
300                err.as_report()
301            ))),
302        })
303    }
304
305    /// Poll the uploading task until it succeeds. If it fails, we will retry it.
306    fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll<Arc<StagingSstableInfo>> {
307        loop {
308            let result = ready!(self.poll_result(cx));
309            match result {
310                Ok(sstables) => return Poll::Ready(sstables),
311                Err(e) => {
312                    error!(
313                        error = %e.as_report(),
314                        task_info = ?self.task_info,
315                        "a flush task failed, start retry",
316                    );
317                    self.join_handle = (self.spawn_upload_task)(
318                        Self::input_to_payload(&self.input),
319                        self.task_info.clone(),
320                    );
321                    // It is important not to return Poll::pending here immediately, because the new
322                    // join_handle is not polled yet, and will not awake the current task when
323                    // succeed. It will be polled in the next loop iteration.
324                }
325            }
326        }
327    }
328
329    pub fn get_task_info(&self) -> &UploadTaskInfo {
330        &self.task_info
331    }
332}
333
334impl TableUnsyncData {
335    fn add_table_watermarks(
336        &mut self,
337        epoch: HummockEpoch,
338        table_watermarks: Vec<VnodeWatermark>,
339        direction: WatermarkDirection,
340        watermark_type: WatermarkSerdeType,
341    ) {
342        if table_watermarks.is_empty() {
343            return;
344        }
345        let vnode_count = table_watermarks[0].vnode_count();
346        for watermark in &table_watermarks {
347            assert_eq!(vnode_count, watermark.vnode_count());
348        }
349
350        fn apply_new_vnodes(
351            vnode_bitmap: &mut BitmapBuilder,
352            vnode_watermarks: &Vec<VnodeWatermark>,
353        ) {
354            for vnode_watermark in vnode_watermarks {
355                for vnode in vnode_watermark.vnode_bitmap().iter_ones() {
356                    assert!(
357                        !vnode_bitmap.is_set(vnode),
358                        "vnode {} write multiple table watermarks",
359                        vnode
360                    );
361                    vnode_bitmap.set(vnode, true);
362                }
363            }
364        }
365        match &mut self.table_watermarks {
366            Some((prev_direction, prev_watermarks, prev_watermark_type)) => {
367                assert_eq!(
368                    *prev_direction, direction,
369                    "table id {} new watermark direction not match with previous",
370                    self.table_id
371                );
372                assert_eq!(
373                    *prev_watermark_type, watermark_type,
374                    "table id {} new watermark watermark_type not match with previous",
375                    self.table_id
376                );
377                match prev_watermarks.entry(epoch) {
378                    Entry::Occupied(mut entry) => {
379                        let (prev_watermarks, vnode_bitmap) = entry.get_mut();
380                        apply_new_vnodes(vnode_bitmap, &table_watermarks);
381                        prev_watermarks.extend(table_watermarks);
382                    }
383                    Entry::Vacant(entry) => {
384                        let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
385                        apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
386                        entry.insert((table_watermarks, vnode_bitmap));
387                    }
388                }
389            }
390            None => {
391                let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
392                apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
393                self.table_watermarks = Some((
394                    direction,
395                    BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]),
396                    watermark_type,
397                ));
398            }
399        }
400    }
401}
402
403impl UploaderData {
404    fn add_table_watermarks(
405        all_table_watermarks: &mut HashMap<TableId, TableWatermarks>,
406        table_id: TableId,
407        direction: WatermarkDirection,
408        watermarks: impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)>,
409        watermark_type: WatermarkSerdeType,
410    ) {
411        let mut table_watermarks: Option<TableWatermarks> = None;
412        for (epoch, watermarks) in watermarks {
413            match &mut table_watermarks {
414                Some(prev_watermarks) => {
415                    assert_eq!(prev_watermarks.direction, direction);
416                    assert_eq!(prev_watermarks.watermark_type, watermark_type);
417                    prev_watermarks.add_new_epoch_watermarks(
418                        epoch,
419                        Arc::from(watermarks),
420                        direction,
421                        watermark_type,
422                    );
423                }
424                None => {
425                    table_watermarks = Some(TableWatermarks::single_epoch(
426                        epoch,
427                        watermarks,
428                        direction,
429                        watermark_type,
430                    ));
431                }
432            }
433        }
434        if let Some(table_watermarks) = table_watermarks {
435            assert!(
436                all_table_watermarks
437                    .insert(table_id, table_watermarks)
438                    .is_none()
439            );
440        }
441    }
442}
443
444struct LocalInstanceEpochData {
445    epoch: HummockEpoch,
446    // newer data comes first.
447    imms: VecDeque<UploaderImm>,
448    has_spilled: bool,
449}
450
451impl LocalInstanceEpochData {
452    fn new(epoch: HummockEpoch) -> Self {
453        Self {
454            epoch,
455            imms: VecDeque::new(),
456            has_spilled: false,
457        }
458    }
459
460    fn epoch(&self) -> HummockEpoch {
461        self.epoch
462    }
463
464    fn add_imm(&mut self, imm: UploaderImm) {
465        assert_eq!(imm.max_epoch(), imm.min_epoch());
466        assert_eq!(self.epoch, imm.min_epoch());
467        if let Some(prev_imm) = self.imms.front() {
468            assert_gt!(imm.batch_id(), prev_imm.batch_id());
469        }
470        self.imms.push_front(imm);
471    }
472
473    fn is_empty(&self) -> bool {
474        self.imms.is_empty()
475    }
476}
477
478struct LocalInstanceUnsyncData {
479    table_id: TableId,
480    instance_id: LocalInstanceId,
481    // None means that the current instance should have stopped advancing
482    current_epoch_data: Option<LocalInstanceEpochData>,
483    // newer data comes first.
484    sealed_data: VecDeque<LocalInstanceEpochData>,
485    // newer data comes first
486    flushing_imms: VecDeque<SharedBufferBatchId>,
487    is_destroyed: bool,
488}
489
490impl LocalInstanceUnsyncData {
491    fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self {
492        Self {
493            table_id,
494            instance_id,
495            current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
496            sealed_data: VecDeque::new(),
497            flushing_imms: Default::default(),
498            is_destroyed: false,
499        }
500    }
501
502    fn add_imm(&mut self, imm: UploaderImm) {
503        assert!(!self.is_destroyed);
504        assert_eq!(self.table_id, imm.table_id);
505        self.current_epoch_data
506            .as_mut()
507            .expect("should be Some when adding new imm")
508            .add_imm(imm);
509    }
510
511    fn local_seal_epoch(&mut self, next_epoch: HummockEpoch) -> HummockEpoch {
512        assert!(!self.is_destroyed);
513        let data = self
514            .current_epoch_data
515            .as_mut()
516            .expect("should be Some when seal new epoch");
517        let current_epoch = data.epoch;
518        debug!(
519            instance_id = self.instance_id,
520            next_epoch, current_epoch, "local seal epoch"
521        );
522        assert_gt!(next_epoch, current_epoch);
523        let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch));
524        if !epoch_data.is_empty() {
525            self.sealed_data.push_front(epoch_data);
526        }
527        current_epoch
528    }
529
530    // imm_ids from old to new, which means in ascending order
531    fn ack_flushed(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
532        for imm_id in imm_ids {
533            assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id);
534        }
535    }
536
537    fn spill(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
538        let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() {
539            match oldest_sealed_epoch.epoch.cmp(&epoch) {
540                Ordering::Less => {
541                    unreachable!(
542                        "should not spill at this epoch because there \
543                    is unspilled data in previous epoch: prev epoch {}, spill epoch {}",
544                        oldest_sealed_epoch.epoch, epoch
545                    );
546                }
547                Ordering::Equal => {
548                    let epoch_data = self.sealed_data.pop_back().unwrap();
549                    assert_eq!(epoch, epoch_data.epoch);
550                    epoch_data.imms
551                }
552                Ordering::Greater => VecDeque::new(),
553            }
554        } else {
555            let Some(current_epoch_data) = &mut self.current_epoch_data else {
556                return Vec::new();
557            };
558            match current_epoch_data.epoch.cmp(&epoch) {
559                Ordering::Less => {
560                    assert!(
561                        current_epoch_data.imms.is_empty(),
562                        "should not spill at this epoch because there \
563                    is unspilled data in current epoch epoch {}, spill epoch {}",
564                        current_epoch_data.epoch,
565                        epoch
566                    );
567                    VecDeque::new()
568                }
569                Ordering::Equal => {
570                    if !current_epoch_data.imms.is_empty() {
571                        current_epoch_data.has_spilled = true;
572                        take(&mut current_epoch_data.imms)
573                    } else {
574                        VecDeque::new()
575                    }
576                }
577                Ordering::Greater => VecDeque::new(),
578            }
579        };
580        self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
581        imms.into_iter().collect()
582    }
583
584    fn add_flushing_imm(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
585        for imm_id in imm_ids {
586            if let Some(prev_imm_id) = self.flushing_imms.front() {
587                assert_gt!(imm_id, *prev_imm_id);
588            }
589            self.flushing_imms.push_front(imm_id);
590        }
591    }
592
593    // start syncing the imm inclusively before the `epoch`
594    // returning data with newer data coming first
595    fn sync(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
596        // firstly added from old to new
597        let mut ret = Vec::new();
598        while let Some(epoch_data) = self.sealed_data.back()
599            && epoch_data.epoch() <= epoch
600        {
601            let imms = self.sealed_data.pop_back().expect("checked exist").imms;
602            self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
603            ret.extend(imms.into_iter().rev());
604        }
605        // reverse so that newer data comes first
606        ret.reverse();
607        if let Some(latest_epoch_data) = &self.current_epoch_data
608            && latest_epoch_data.epoch <= epoch
609        {
610            assert!(self.sealed_data.is_empty());
611            assert!(latest_epoch_data.is_empty());
612            assert!(!latest_epoch_data.has_spilled);
613            if cfg!(debug_assertions) {
614                panic!(
615                    "sync epoch exceeds latest epoch, and the current instance should have been archived, table_id = {}, latest_epoch_data = {}, epoch = {}",
616                    self.table_id.table_id,
617                    latest_epoch_data.epoch(),
618                    epoch
619                );
620            }
621            warn!(
622                instance_id = self.instance_id,
623                table_id = self.table_id.table_id,
624                "sync epoch exceeds latest epoch, and the current instance should have be archived"
625            );
626            self.current_epoch_data = None;
627        }
628        ret
629    }
630
631    fn assert_after_epoch(&self, epoch: HummockEpoch) {
632        if let Some(oldest_sealed_data) = self.sealed_data.back() {
633            assert!(!oldest_sealed_data.imms.is_empty());
634            assert_gt!(oldest_sealed_data.epoch, epoch);
635        } else if let Some(current_data) = &self.current_epoch_data
636            && current_data.epoch <= epoch
637        {
638            assert!(current_data.imms.is_empty() && !current_data.has_spilled);
639        }
640    }
641
642    fn is_finished(&self) -> bool {
643        self.is_destroyed && self.sealed_data.is_empty()
644    }
645}
646
647struct TableUnsyncData {
648    table_id: TableId,
649    instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
650    #[expect(clippy::type_complexity)]
651    table_watermarks: Option<(
652        WatermarkDirection,
653        BTreeMap<HummockEpoch, (Vec<VnodeWatermark>, BitmapBuilder)>,
654        WatermarkSerdeType,
655    )>,
656    spill_tasks: BTreeMap<HummockEpoch, VecDeque<UploadingTaskId>>,
657    unsync_epochs: BTreeMap<HummockEpoch, ()>,
658    // Initialized to be `None`. Transform to `Some(_)` when called
659    // `local_seal_epoch` with a non-existing epoch, to mark that
660    // the fragment of the table has stopped.
661    stopped_next_epoch: Option<HummockEpoch>,
662    // newer epoch at the front
663    syncing_epochs: VecDeque<HummockEpoch>,
664    max_synced_epoch: Option<HummockEpoch>,
665}
666
667impl TableUnsyncData {
668    fn new(table_id: TableId, committed_epoch: Option<HummockEpoch>) -> Self {
669        Self {
670            table_id,
671            instance_data: Default::default(),
672            table_watermarks: None,
673            spill_tasks: Default::default(),
674            unsync_epochs: Default::default(),
675            stopped_next_epoch: None,
676            syncing_epochs: Default::default(),
677            max_synced_epoch: committed_epoch,
678        }
679    }
680
681    fn new_epoch(&mut self, epoch: HummockEpoch) {
682        debug!(table_id = ?self.table_id, epoch, "table new epoch");
683        if let Some(latest_epoch) = self.max_epoch() {
684            assert_gt!(epoch, latest_epoch);
685        }
686        self.unsync_epochs.insert(epoch, ());
687    }
688
689    #[expect(clippy::type_complexity)]
690    fn sync(
691        &mut self,
692        epoch: HummockEpoch,
693    ) -> (
694        impl Iterator<Item = (LocalInstanceId, Vec<UploaderImm>)> + '_,
695        Option<(
696            WatermarkDirection,
697            impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)> + use<>,
698            WatermarkSerdeType,
699        )>,
700        impl Iterator<Item = UploadingTaskId> + use<>,
701        BTreeMap<HummockEpoch, ()>,
702    ) {
703        if let Some(prev_epoch) = self.max_sync_epoch() {
704            assert_gt!(epoch, prev_epoch)
705        }
706        let epochs = take_before_epoch(&mut self.unsync_epochs, epoch);
707        assert_eq!(
708            *epochs.last_key_value().expect("non-empty").0,
709            epoch,
710            "{epochs:?} {epoch} {:?}",
711            self.table_id
712        );
713        self.syncing_epochs.push_front(epoch);
714        (
715            self.instance_data
716                .iter_mut()
717                .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))),
718            self.table_watermarks
719                .as_mut()
720                .map(|(direction, watermarks, watermark_type)| {
721                    let watermarks = take_before_epoch(watermarks, epoch)
722                        .into_iter()
723                        .map(|(epoch, (watermarks, _))| (epoch, watermarks));
724                    (*direction, watermarks, *watermark_type)
725                }),
726            take_before_epoch(&mut self.spill_tasks, epoch)
727                .into_values()
728                .flat_map(|tasks| tasks.into_iter()),
729            epochs,
730        )
731    }
732
733    fn ack_synced(&mut self, sync_epoch: HummockEpoch) {
734        let min_sync_epoch = self.syncing_epochs.pop_back().expect("should exist");
735        assert_eq!(sync_epoch, min_sync_epoch);
736        self.max_synced_epoch = Some(sync_epoch);
737    }
738
739    fn ack_committed(&mut self, committed_epoch: HummockEpoch) {
740        let synced_epoch_advanced = {
741            if let Some(max_synced_epoch) = self.max_synced_epoch
742                && max_synced_epoch >= committed_epoch
743            {
744                false
745            } else {
746                true
747            }
748        };
749        if synced_epoch_advanced {
750            self.max_synced_epoch = Some(committed_epoch);
751            if let Some(min_syncing_epoch) = self.syncing_epochs.back() {
752                assert_gt!(*min_syncing_epoch, committed_epoch);
753            }
754            self.assert_after_epoch(committed_epoch);
755        }
756    }
757
758    fn assert_after_epoch(&self, epoch: HummockEpoch) {
759        self.instance_data
760            .values()
761            .for_each(|instance_data| instance_data.assert_after_epoch(epoch));
762        if let Some((_, watermarks, _)) = &self.table_watermarks
763            && let Some((oldest_epoch, _)) = watermarks.first_key_value()
764        {
765            assert_gt!(*oldest_epoch, epoch);
766        }
767    }
768
769    fn max_sync_epoch(&self) -> Option<HummockEpoch> {
770        self.syncing_epochs
771            .front()
772            .cloned()
773            .or(self.max_synced_epoch)
774    }
775
776    fn max_epoch(&self) -> Option<HummockEpoch> {
777        self.unsync_epochs
778            .last_key_value()
779            .map(|(epoch, _)| *epoch)
780            .or_else(|| self.max_sync_epoch())
781    }
782
783    fn is_empty(&self) -> bool {
784        self.instance_data.is_empty()
785            && self.syncing_epochs.is_empty()
786            && self.unsync_epochs.is_empty()
787    }
788}
789
790#[derive(Eq, Hash, PartialEq, Copy, Clone)]
791struct UnsyncEpochId(HummockEpoch, TableId);
792
793impl UnsyncEpochId {
794    fn epoch(&self) -> HummockEpoch {
795        self.0
796    }
797}
798
799fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet<TableId>) -> Option<UnsyncEpochId> {
800    table_ids
801        .iter()
802        .min()
803        .map(|table_id| UnsyncEpochId(epoch, *table_id))
804}
805
806#[derive(Default)]
807/// Unsync data, can be either imm or spilled sst, and some aggregated epoch information.
808///
809/// `instance_data` holds the imm of each individual local instance, and data are first added here.
810/// The aggregated epoch information (table watermarks, etc.) and the spilled sst will be added to `epoch_data`.
811struct UnsyncData {
812    table_data: HashMap<TableId, TableUnsyncData>,
813    // An index as a mapping from instance id to its table id
814    instance_table_id: HashMap<LocalInstanceId, TableId>,
815    unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
816    spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
817}
818
819impl UnsyncData {
820    fn init_instance(
821        &mut self,
822        table_id: TableId,
823        instance_id: LocalInstanceId,
824        init_epoch: HummockEpoch,
825    ) {
826        debug!(
827            table_id = table_id.table_id,
828            instance_id, init_epoch, "init epoch"
829        );
830        let table_data = self
831            .table_data
832            .get_mut(&table_id)
833            .unwrap_or_else(|| panic!("should exist. {table_id:?}"));
834        assert!(
835            table_data
836                .instance_data
837                .insert(
838                    instance_id,
839                    LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch)
840                )
841                .is_none()
842        );
843        assert!(
844            self.instance_table_id
845                .insert(instance_id, table_id)
846                .is_none()
847        );
848        assert!(table_data.unsync_epochs.contains_key(&init_epoch));
849    }
850
851    fn instance_data(
852        &mut self,
853        instance_id: LocalInstanceId,
854    ) -> Option<&mut LocalInstanceUnsyncData> {
855        self.instance_table_id
856            .get_mut(&instance_id)
857            .cloned()
858            .map(move |table_id| {
859                self.table_data
860                    .get_mut(&table_id)
861                    .expect("should exist")
862                    .instance_data
863                    .get_mut(&instance_id)
864                    .expect("should exist")
865            })
866    }
867
868    fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) {
869        self.instance_data(instance_id)
870            .expect("should exist")
871            .add_imm(imm);
872    }
873
874    fn local_seal_epoch(
875        &mut self,
876        instance_id: LocalInstanceId,
877        next_epoch: HummockEpoch,
878        opts: SealCurrentEpochOptions,
879    ) {
880        let table_id = self.instance_table_id[&instance_id];
881        let table_data = self.table_data.get_mut(&table_id).expect("should exist");
882        let instance_data = table_data
883            .instance_data
884            .get_mut(&instance_id)
885            .expect("should exist");
886        let epoch = instance_data.local_seal_epoch(next_epoch);
887        // When drop/cancel a streaming job, for the barrier to stop actor, the
888        // local instance will call `local_seal_epoch`, but the `next_epoch` won't be
889        // called `start_epoch` because we have stopped writing on it.
890        if !table_data.unsync_epochs.contains_key(&next_epoch) {
891            if let Some(stopped_next_epoch) = table_data.stopped_next_epoch {
892                if stopped_next_epoch != next_epoch {
893                    let table_id = table_data.table_id.table_id;
894                    let unsync_epochs = table_data.unsync_epochs.keys().collect_vec();
895                    if cfg!(debug_assertions) {
896                        panic!(
897                            "table_id {} stop epoch {} different to prev stop epoch {}. unsync epochs: {:?}, syncing epochs {:?}, max_synced_epoch {:?}",
898                            table_id,
899                            next_epoch,
900                            stopped_next_epoch,
901                            unsync_epochs,
902                            table_data.syncing_epochs,
903                            table_data.max_synced_epoch
904                        );
905                    } else {
906                        warn!(
907                            table_id,
908                            stopped_next_epoch,
909                            next_epoch,
910                            ?unsync_epochs,
911                            syncing_epochs = ?table_data.syncing_epochs,
912                            max_synced_epoch = ?table_data.max_synced_epoch,
913                            "different stop epoch"
914                        );
915                    }
916                }
917            } else {
918                if let Some(max_epoch) = table_data.max_epoch() {
919                    assert_gt!(next_epoch, max_epoch);
920                }
921                debug!(?table_id, epoch, next_epoch, "table data has stopped");
922                table_data.stopped_next_epoch = Some(next_epoch);
923            }
924        }
925        if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks {
926            table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type);
927        }
928    }
929
930    fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
931        if let Some(table_id) = self.instance_table_id.get(&instance_id) {
932            debug!(instance_id, "destroy instance");
933            let table_data = self.table_data.get_mut(table_id).expect("should exist");
934            let instance_data = table_data
935                .instance_data
936                .get_mut(&instance_id)
937                .expect("should exist");
938            assert!(
939                !instance_data.is_destroyed,
940                "cannot destroy an instance for twice"
941            );
942            instance_data.is_destroyed = true;
943        }
944    }
945
946    fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
947        for table_id in table_ids {
948            if let Some(table_unsync_data) = self.table_data.remove(table_id) {
949                for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
950                    if let Some(task_status) = task_manager.abort_task(task_id) {
951                        must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
952                            assert!(spill_table_ids.is_subset(table_ids));
953                        });
954                    }
955                    if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
956                        assert!(spill_table_ids.is_subset(table_ids));
957                    }
958                }
959                assert!(
960                    table_unsync_data
961                        .instance_data
962                        .values()
963                        .all(|instance| instance.is_destroyed),
964                    "should be clear when dropping the read version instance"
965                );
966                for instance_id in table_unsync_data.instance_data.keys() {
967                    assert_eq!(
968                        *table_id,
969                        self.instance_table_id
970                            .remove(instance_id)
971                            .expect("should exist")
972                    );
973                }
974            }
975        }
976        debug_assert!(
977            self.spilled_data
978                .values()
979                .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))
980        );
981        self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
982            if !unsync_epoch_table_ids.is_disjoint(table_ids) {
983                assert!(unsync_epoch_table_ids.is_subset(table_ids));
984                false
985            } else {
986                true
987            }
988        });
989        assert!(
990            self.instance_table_id
991                .values()
992                .all(|table_id| !table_ids.contains(table_id))
993        );
994    }
995}
996
997impl UploaderData {
998    fn sync(
999        &mut self,
1000        context: &UploaderContext,
1001        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1002        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1003    ) {
1004        let mut all_table_watermarks = HashMap::new();
1005        let mut uploading_tasks = HashSet::new();
1006        let mut spilled_tasks = BTreeSet::new();
1007        let mut all_table_ids = HashSet::new();
1008        let mut vector_index_adds = HashMap::new();
1009
1010        let mut flush_payload = HashMap::new();
1011
1012        for (epoch, table_ids) in &sync_table_epochs {
1013            let epoch = *epoch;
1014            for table_id in table_ids {
1015                assert!(
1016                    all_table_ids.insert(*table_id),
1017                    "duplicate sync table epoch: {:?} {:?}",
1018                    all_table_ids,
1019                    sync_table_epochs
1020                );
1021            }
1022            if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) {
1023                let min_table_id_data = self
1024                    .unsync_data
1025                    .table_data
1026                    .get_mut(&min_table_id)
1027                    .expect("should exist");
1028                let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch);
1029                for epoch in epochs.keys() {
1030                    assert_eq!(
1031                        &self
1032                            .unsync_data
1033                            .unsync_epochs
1034                            .remove(&UnsyncEpochId(*epoch, min_table_id))
1035                            .expect("should exist"),
1036                        table_ids
1037                    );
1038                }
1039                for table_id in table_ids {
1040                    let table_data = self
1041                        .unsync_data
1042                        .table_data
1043                        .get_mut(table_id)
1044                        .expect("should exist");
1045                    let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) =
1046                        table_data.sync(epoch);
1047                    assert_eq!(table_unsync_epochs, epochs);
1048                    for (instance_id, payload) in unflushed_payload {
1049                        if !payload.is_empty() {
1050                            flush_payload.insert(instance_id, payload);
1051                        }
1052                    }
1053                    table_data.instance_data.retain(|instance_id, data| {
1054                        // remove the finished instances
1055                        if data.is_finished() {
1056                            assert_eq!(
1057                                self.unsync_data.instance_table_id.remove(instance_id),
1058                                Some(*table_id)
1059                            );
1060                            false
1061                        } else {
1062                            true
1063                        }
1064                    });
1065                    if let Some((direction, watermarks, watermark_type)) = table_watermarks {
1066                        Self::add_table_watermarks(
1067                            &mut all_table_watermarks,
1068                            *table_id,
1069                            direction,
1070                            watermarks,
1071                            watermark_type,
1072                        );
1073                    }
1074                    for task_id in task_ids {
1075                        if self.unsync_data.spilled_data.contains_key(&task_id) {
1076                            spilled_tasks.insert(task_id);
1077                        } else {
1078                            uploading_tasks.insert(task_id);
1079                        }
1080                    }
1081
1082                    if let hash_map::Entry::Occupied(mut entry) =
1083                        self.unsync_vector_index_data.entry(*table_id)
1084                    {
1085                        let data = entry.get_mut();
1086                        let adds = take_before_epoch(&mut data.sealed_epoch_data, epoch)
1087                            .into_values()
1088                            .flatten()
1089                            .collect_vec();
1090                        if data.is_dropped && data.sealed_epoch_data.is_empty() {
1091                            entry.remove();
1092                        }
1093                        if !adds.is_empty() {
1094                            vector_index_adds
1095                                .try_insert(*table_id, adds)
1096                                .expect("non-duplicate");
1097                        }
1098                    }
1099                }
1100            }
1101        }
1102
1103        let sync_id = {
1104            let sync_id = self.next_sync_id;
1105            self.next_sync_id += 1;
1106            SyncId(sync_id)
1107        };
1108
1109        if let Some(extra_flush_task_id) = self.task_manager.sync(
1110            context,
1111            sync_id,
1112            flush_payload,
1113            uploading_tasks.iter().cloned(),
1114            &all_table_ids,
1115        ) {
1116            uploading_tasks.insert(extra_flush_task_id);
1117        }
1118
1119        // iter from large task_id to small one so that newer data at the front
1120        let uploaded = spilled_tasks
1121            .iter()
1122            .rev()
1123            .map(|task_id| {
1124                let (sst, spill_table_ids) = self
1125                    .unsync_data
1126                    .spilled_data
1127                    .remove(task_id)
1128                    .expect("should exist");
1129                assert!(
1130                    spill_table_ids.is_subset(&all_table_ids),
1131                    "spilled tabled ids {:?} not a subset of sync table id {:?}",
1132                    spill_table_ids,
1133                    all_table_ids
1134                );
1135                sst
1136            })
1137            .collect();
1138
1139        self.syncing_data.insert(
1140            sync_id,
1141            SyncingData {
1142                sync_table_epochs,
1143                remaining_uploading_tasks: uploading_tasks,
1144                uploaded,
1145                table_watermarks: all_table_watermarks,
1146                vector_index_adds,
1147                sync_result_sender,
1148            },
1149        );
1150
1151        self.check_upload_task_consistency();
1152    }
1153}
1154
1155impl UnsyncData {
1156    fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
1157        for (instance_id, imm_ids) in sstable_info.imm_ids() {
1158            if let Some(instance_data) = self.instance_data(*instance_id) {
1159                // take `rev` to let old imm id goes first
1160                instance_data.ack_flushed(imm_ids.iter().rev().cloned());
1161            }
1162        }
1163    }
1164}
1165
1166struct SyncingData {
1167    sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1168    remaining_uploading_tasks: HashSet<UploadingTaskId>,
1169    // newer data at the front
1170    uploaded: VecDeque<Arc<StagingSstableInfo>>,
1171    table_watermarks: HashMap<TableId, TableWatermarks>,
1172    vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1173    sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1174}
1175
1176#[derive(Debug)]
1177pub struct SyncedData {
1178    pub uploaded_ssts: VecDeque<Arc<StagingSstableInfo>>,
1179    pub table_watermarks: HashMap<TableId, TableWatermarks>,
1180    pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1181}
1182
1183struct UploaderContext {
1184    pinned_version: PinnedVersion,
1185    /// When called, it will spawn a task to flush the imm into sst and return the join handle.
1186    spawn_upload_task: SpawnUploadTask,
1187    buffer_tracker: BufferTracker,
1188
1189    stats: Arc<HummockStateStoreMetrics>,
1190}
1191
1192impl UploaderContext {
1193    fn new(
1194        pinned_version: PinnedVersion,
1195        spawn_upload_task: SpawnUploadTask,
1196        buffer_tracker: BufferTracker,
1197        stats: Arc<HummockStateStoreMetrics>,
1198    ) -> Self {
1199        UploaderContext {
1200            pinned_version,
1201            spawn_upload_task,
1202            buffer_tracker,
1203            stats,
1204        }
1205    }
1206}
1207
1208#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
1209struct SyncId(usize);
1210
1211struct UnsyncVectorIndexData {
1212    sealed_epoch_data: BTreeMap<HummockEpoch, Option<VectorIndexAdd>>,
1213    curr_epoch: u64,
1214    is_dropped: bool,
1215}
1216
1217#[derive(Default)]
1218struct UploaderData {
1219    unsync_data: UnsyncData,
1220    unsync_vector_index_data: HashMap<TableId, UnsyncVectorIndexData>,
1221
1222    syncing_data: BTreeMap<SyncId, SyncingData>,
1223
1224    task_manager: TaskManager,
1225    next_sync_id: usize,
1226}
1227
1228impl UploaderData {
1229    fn abort(self, err: impl Fn() -> HummockError) {
1230        self.task_manager.abort_all_tasks();
1231        for syncing_data in self.syncing_data.into_values() {
1232            send_sync_result(syncing_data.sync_result_sender, Err(err()));
1233        }
1234    }
1235
1236    fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
1237        if table_ids.is_empty() {
1238            return;
1239        }
1240        self.unsync_data
1241            .clear_tables(&table_ids, &mut self.task_manager);
1242        self.syncing_data.retain(|sync_id, syncing_data| {
1243            if syncing_data
1244                .sync_table_epochs
1245                .iter()
1246                .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids))
1247            {
1248                assert!(
1249                    syncing_data
1250                        .sync_table_epochs
1251                        .iter()
1252                        .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))
1253                );
1254                for task_id in &syncing_data.remaining_uploading_tasks {
1255                    match self
1256                        .task_manager
1257                        .abort_task(*task_id)
1258                        .expect("should exist")
1259                    {
1260                        UploadingTaskStatus::Spilling(spill_table_ids) => {
1261                            assert!(spill_table_ids.is_subset(&table_ids));
1262                        }
1263                        UploadingTaskStatus::Sync(task_sync_id) => {
1264                            assert_eq!(sync_id, &task_sync_id);
1265                        }
1266                    }
1267                }
1268                false
1269            } else {
1270                true
1271            }
1272        });
1273
1274        self.check_upload_task_consistency();
1275    }
1276
1277    fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1278        self.unsync_data
1279            .spilled_data
1280            .values()
1281            .map(|(s, _)| s)
1282            .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
1283            .filter_map(|s| {
1284                s.sstable_infos()
1285                    .iter()
1286                    .chain(s.old_value_sstable_infos())
1287                    .map(|s| s.sst_info.object_id)
1288                    .min()
1289            })
1290            .min()
1291            .map(|object_id| object_id.as_raw())
1292    }
1293}
1294
1295struct ErrState {
1296    failed_sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1297    reason: String,
1298}
1299
1300enum UploaderState {
1301    Working(UploaderData),
1302    Err(ErrState),
1303}
1304
1305/// An uploader for hummock data.
1306///
1307/// Data have 3 sequential stages: unsync (inside each local instance, data can be unsealed, sealed), syncing, synced.
1308///
1309/// The 3 stages are divided by 2 marginal epochs: `max_syncing_epoch`,
1310/// `max_synced_epoch` in each `TableUnSyncData`. Epochs satisfy the following inequality.
1311///
1312/// (epochs of `synced_data`) <= `max_synced_epoch` < (epochs of `syncing_data`) <=
1313/// `max_syncing_epoch` < (epochs of `unsync_data`)
1314///
1315/// Data are mostly stored in `VecDeque`, and the order stored in the `VecDeque` indicates the data
1316/// order. Data at the front represents ***newer*** data.
1317pub struct HummockUploader {
1318    state: UploaderState,
1319
1320    context: UploaderContext,
1321}
1322
1323impl HummockUploader {
1324    pub(super) fn new(
1325        state_store_metrics: Arc<HummockStateStoreMetrics>,
1326        pinned_version: PinnedVersion,
1327        spawn_upload_task: SpawnUploadTask,
1328        buffer_tracker: BufferTracker,
1329    ) -> Self {
1330        Self {
1331            state: UploaderState::Working(UploaderData::default()),
1332            context: UploaderContext::new(
1333                pinned_version,
1334                spawn_upload_task,
1335                buffer_tracker,
1336                state_store_metrics,
1337            ),
1338        }
1339    }
1340
1341    pub(super) fn buffer_tracker(&self) -> &BufferTracker {
1342        &self.context.buffer_tracker
1343    }
1344
1345    pub(super) fn hummock_version(&self) -> &PinnedVersion {
1346        &self.context.pinned_version
1347    }
1348
1349    pub(super) fn add_imms(&mut self, instance_id: LocalInstanceId, imms: Vec<ImmutableMemtable>) {
1350        let UploaderState::Working(data) = &mut self.state else {
1351            return;
1352        };
1353        for imm in imms {
1354            let imm = UploaderImm::new(imm, &self.context);
1355            data.unsync_data.add_imm(instance_id, imm);
1356        }
1357    }
1358
1359    pub(super) fn init_instance(
1360        &mut self,
1361        instance_id: LocalInstanceId,
1362        table_id: TableId,
1363        init_epoch: HummockEpoch,
1364    ) {
1365        let UploaderState::Working(data) = &mut self.state else {
1366            return;
1367        };
1368        data.unsync_data
1369            .init_instance(table_id, instance_id, init_epoch);
1370    }
1371
1372    pub(super) fn local_seal_epoch(
1373        &mut self,
1374        instance_id: LocalInstanceId,
1375        next_epoch: HummockEpoch,
1376        opts: SealCurrentEpochOptions,
1377    ) {
1378        let UploaderState::Working(data) = &mut self.state else {
1379            return;
1380        };
1381        data.unsync_data
1382            .local_seal_epoch(instance_id, next_epoch, opts);
1383    }
1384
1385    pub(super) fn register_vector_writer(&mut self, table_id: TableId, init_epoch: HummockEpoch) {
1386        let UploaderState::Working(data) = &mut self.state else {
1387            return;
1388        };
1389        assert!(
1390            data.unsync_vector_index_data
1391                .try_insert(
1392                    table_id,
1393                    UnsyncVectorIndexData {
1394                        sealed_epoch_data: Default::default(),
1395                        curr_epoch: init_epoch,
1396                        is_dropped: false,
1397                    }
1398                )
1399                .is_ok(),
1400            "duplicate vector writer on {}",
1401            table_id
1402        )
1403    }
1404
1405    pub(super) fn vector_writer_seal_epoch(
1406        &mut self,
1407        table_id: TableId,
1408        next_epoch: HummockEpoch,
1409        add: Option<VectorIndexAdd>,
1410    ) {
1411        let UploaderState::Working(data) = &mut self.state else {
1412            return;
1413        };
1414        let data = data
1415            .unsync_vector_index_data
1416            .get_mut(&table_id)
1417            .expect("should exist");
1418        assert!(!data.is_dropped);
1419        assert!(
1420            data.curr_epoch < next_epoch,
1421            "next epoch {} should be greater than current epoch {}",
1422            next_epoch,
1423            data.curr_epoch
1424        );
1425        data.sealed_epoch_data
1426            .try_insert(data.curr_epoch, add)
1427            .expect("non-duplicate");
1428        data.curr_epoch = next_epoch;
1429    }
1430
1431    pub(super) fn drop_vector_writer(&mut self, table_id: TableId) {
1432        let UploaderState::Working(data) = &mut self.state else {
1433            return;
1434        };
1435        let hash_map::Entry::Occupied(mut entry) = data.unsync_vector_index_data.entry(table_id)
1436        else {
1437            panic!("vector writer {} should exist", table_id);
1438        };
1439        let data = entry.get_mut();
1440        data.is_dropped = true;
1441        if data.sealed_epoch_data.is_empty() {
1442            entry.remove();
1443        }
1444    }
1445
1446    pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
1447        let UploaderState::Working(data) = &mut self.state else {
1448            return;
1449        };
1450        debug!(epoch, ?table_ids, "start epoch");
1451        for table_id in &table_ids {
1452            let table_data = data
1453                .unsync_data
1454                .table_data
1455                .entry(*table_id)
1456                .or_insert_with(|| {
1457                    TableUnsyncData::new(
1458                        *table_id,
1459                        self.context.pinned_version.table_committed_epoch(*table_id),
1460                    )
1461                });
1462            table_data.new_epoch(epoch);
1463        }
1464        if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) {
1465            assert!(
1466                data.unsync_data
1467                    .unsync_epochs
1468                    .insert(unsync_epoch_id, table_ids)
1469                    .is_none()
1470            );
1471        }
1472    }
1473
1474    pub(super) fn start_sync_epoch(
1475        &mut self,
1476        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1477        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1478    ) {
1479        let data = match &mut self.state {
1480            UploaderState::Working(data) => data,
1481            UploaderState::Err(ErrState {
1482                failed_sync_table_epochs,
1483                reason,
1484            }) => {
1485                let result = Err(HummockError::other(format!(
1486                    "previous sync epoch {:?} failed due to [{}]",
1487                    failed_sync_table_epochs, reason
1488                )));
1489                send_sync_result(sync_result_sender, result);
1490                return;
1491            }
1492        };
1493        debug!(?sync_table_epochs, "start sync epoch");
1494
1495        data.sync(&self.context, sync_result_sender, sync_table_epochs);
1496
1497        data.may_notify_sync_task(&self.context);
1498
1499        self.context
1500            .stats
1501            .uploader_syncing_epoch_count
1502            .set(data.syncing_data.len() as _);
1503    }
1504
1505    pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) {
1506        if let UploaderState::Working(data) = &mut self.state {
1507            // TODO: may only `ack_committed` on table whose `committed_epoch` is changed.
1508            for (table_id, info) in pinned_version.state_table_info.info() {
1509                if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) {
1510                    table_data.ack_committed(info.committed_epoch);
1511                }
1512            }
1513        }
1514        self.context.pinned_version = pinned_version;
1515    }
1516
1517    pub(crate) fn may_flush(&mut self) -> bool {
1518        let UploaderState::Working(data) = &mut self.state else {
1519            return false;
1520        };
1521        if self.context.buffer_tracker.need_flush() {
1522            let mut spiller = Spiller::new(&mut data.unsync_data);
1523            let mut curr_batch_flush_size = 0;
1524            // iterate from older epoch to newer epoch
1525            while self
1526                .context
1527                .buffer_tracker
1528                .need_more_flush(curr_batch_flush_size)
1529                && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload()
1530            {
1531                assert!(!payload.is_empty());
1532                {
1533                    let (task_id, task_size, spilled_table_ids) =
1534                        data.task_manager
1535                            .spill(&self.context, spilled_table_ids, payload);
1536                    for table_id in spilled_table_ids {
1537                        spiller
1538                            .unsync_data()
1539                            .table_data
1540                            .get_mut(table_id)
1541                            .expect("should exist")
1542                            .spill_tasks
1543                            .entry(epoch)
1544                            .or_default()
1545                            .push_front(task_id);
1546                    }
1547                    curr_batch_flush_size += task_size;
1548                }
1549            }
1550            data.check_upload_task_consistency();
1551            curr_batch_flush_size > 0
1552        } else {
1553            false
1554        }
1555    }
1556
1557    pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
1558        if let Some(table_ids) = table_ids {
1559            if let UploaderState::Working(data) = &mut self.state {
1560                data.clear_tables(table_ids);
1561            }
1562        } else {
1563            if let UploaderState::Working(data) = replace(
1564                &mut self.state,
1565                UploaderState::Working(UploaderData::default()),
1566            ) {
1567                data.abort(|| HummockError::other("uploader is reset"));
1568            }
1569
1570            self.context.stats.uploader_syncing_epoch_count.set(0);
1571        }
1572    }
1573
1574    pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
1575        let UploaderState::Working(data) = &mut self.state else {
1576            return;
1577        };
1578        data.unsync_data.may_destroy_instance(instance_id);
1579    }
1580
1581    pub(crate) fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1582        if let UploaderState::Working(ref u) = self.state {
1583            u.min_uncommitted_object_id()
1584        } else {
1585            None
1586        }
1587    }
1588}
1589
1590impl UploaderData {
1591    fn may_notify_sync_task(&mut self, context: &UploaderContext) {
1592        while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
1593            && syncing_data.remaining_uploading_tasks.is_empty()
1594        {
1595            let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty");
1596            let SyncingData {
1597                sync_table_epochs,
1598                remaining_uploading_tasks: _,
1599                uploaded,
1600                table_watermarks,
1601                vector_index_adds,
1602                sync_result_sender,
1603            } = syncing_data;
1604            context
1605                .stats
1606                .uploader_syncing_epoch_count
1607                .set(self.syncing_data.len() as _);
1608
1609            for (sync_epoch, table_ids) in sync_table_epochs {
1610                for table_id in table_ids {
1611                    if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) {
1612                        table_data.ack_synced(sync_epoch);
1613                        if table_data.is_empty() {
1614                            self.unsync_data.table_data.remove(&table_id);
1615                        }
1616                    }
1617                }
1618            }
1619
1620            send_sync_result(
1621                sync_result_sender,
1622                Ok(SyncedData {
1623                    uploaded_ssts: uploaded,
1624                    table_watermarks,
1625                    vector_index_adds,
1626                }),
1627            )
1628        }
1629    }
1630
1631    fn check_upload_task_consistency(&self) {
1632        #[cfg(debug_assertions)]
1633        {
1634            let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new();
1635            for table_data in self.unsync_data.table_data.values() {
1636                for task_id in table_data
1637                    .spill_tasks
1638                    .iter()
1639                    .flat_map(|(_, tasks)| tasks.iter())
1640                {
1641                    assert!(
1642                        spill_task_table_id_from_data
1643                            .entry(*task_id)
1644                            .or_default()
1645                            .insert(table_data.table_id)
1646                    );
1647                }
1648            }
1649            let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self
1650                .syncing_data
1651                .iter()
1652                .filter_map(|(sync_id, data)| {
1653                    if data.remaining_uploading_tasks.is_empty() {
1654                        None
1655                    } else {
1656                        Some((*sync_id, data.remaining_uploading_tasks.clone()))
1657                    }
1658                })
1659                .collect();
1660
1661            let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1662            for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
1663                spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
1664            }
1665            let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1666            for (task_id, status) in self.task_manager.tasks() {
1667                match status {
1668                    UploadingTaskStatus::Spilling(table_ids) => {
1669                        assert!(
1670                            spill_task_table_id_from_manager
1671                                .insert(task_id, table_ids.clone())
1672                                .is_none()
1673                        );
1674                    }
1675                    UploadingTaskStatus::Sync(sync_id) => {
1676                        assert!(
1677                            syncing_task_from_manager
1678                                .entry(*sync_id)
1679                                .or_default()
1680                                .insert(task_id)
1681                        );
1682                    }
1683                }
1684            }
1685            assert_eq!(
1686                spill_task_table_id_from_data,
1687                spill_task_table_id_from_manager
1688            );
1689            assert_eq!(syncing_task_id_from_data, syncing_task_from_manager);
1690        }
1691    }
1692}
1693
1694impl HummockUploader {
1695    pub(super) fn next_uploaded_sst(
1696        &mut self,
1697    ) -> impl Future<Output = Arc<StagingSstableInfo>> + '_ {
1698        poll_fn(|cx| {
1699            let UploaderState::Working(data) = &mut self.state else {
1700                return Poll::Pending;
1701            };
1702
1703            if let Some((task_id, status, result)) = ready!(data.task_manager.poll_task_result(cx))
1704            {
1705                match result {
1706                    Ok(sst) => {
1707                        data.unsync_data.ack_flushed(&sst);
1708                        match status {
1709                            UploadingTaskStatus::Sync(sync_id) => {
1710                                let syncing_data =
1711                                    data.syncing_data.get_mut(&sync_id).expect("should exist");
1712                                syncing_data.uploaded.push_front(sst.clone());
1713                                assert!(syncing_data.remaining_uploading_tasks.remove(&task_id));
1714                                data.may_notify_sync_task(&self.context);
1715                            }
1716                            UploadingTaskStatus::Spilling(table_ids) => {
1717                                data.unsync_data
1718                                    .spilled_data
1719                                    .insert(task_id, (sst.clone(), table_ids));
1720                            }
1721                        }
1722                        data.check_upload_task_consistency();
1723                        Poll::Ready(sst)
1724                    }
1725                    Err((sync_id, e)) => {
1726                        let syncing_data =
1727                            data.syncing_data.remove(&sync_id).expect("should exist");
1728                        let failed_epochs = syncing_data.sync_table_epochs.clone();
1729                        let data = must_match!(replace(
1730                            &mut self.state,
1731                            UploaderState::Err(ErrState {
1732                                failed_sync_table_epochs: syncing_data.sync_table_epochs,
1733                                reason: e.as_report().to_string(),
1734                            }),
1735                        ), UploaderState::Working(data) => data);
1736
1737                        let _ = syncing_data
1738                            .sync_result_sender
1739                            .send(Err(HummockError::other(format!(
1740                                "failed to sync: {:?}",
1741                                e.as_report()
1742                            ))));
1743
1744                        data.abort(|| {
1745                            HummockError::other(format!(
1746                                "previous epoch {:?} failed to sync",
1747                                failed_epochs
1748                            ))
1749                        });
1750                        Poll::Pending
1751                    }
1752                }
1753            } else {
1754                Poll::Pending
1755            }
1756        })
1757    }
1758}
1759
1760#[cfg(test)]
1761pub(crate) mod tests {
1762    use std::collections::{HashMap, HashSet};
1763    use std::future::{Future, poll_fn};
1764    use std::ops::Deref;
1765    use std::pin::pin;
1766    use std::sync::Arc;
1767    use std::sync::atomic::AtomicUsize;
1768    use std::sync::atomic::Ordering::SeqCst;
1769    use std::task::Poll;
1770
1771    use futures::FutureExt;
1772    use risingwave_common::catalog::TableId;
1773    use risingwave_common::util::epoch::EpochExt;
1774    use risingwave_hummock_sdk::HummockEpoch;
1775    use risingwave_hummock_sdk::vector_index::{
1776        FlatIndexAdd, VectorFileInfo, VectorIndexAdd, VectorStoreInfoDelta,
1777    };
1778    use tokio::sync::oneshot;
1779
1780    use super::test_utils::*;
1781    use crate::hummock::event_handler::uploader::{
1782        HummockUploader, SyncedData, UploadingTask, get_payload_imm_ids,
1783    };
1784    use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID};
1785    use crate::hummock::{HummockError, HummockResult};
1786    use crate::mem_table::ImmutableMemtable;
1787    use crate::opts::StorageOpts;
1788
1789    impl HummockUploader {
1790        pub(super) fn add_imm(&mut self, instance_id: LocalInstanceId, imm: ImmutableMemtable) {
1791            self.add_imms(instance_id, vec![imm]);
1792        }
1793
1794        pub(super) fn start_single_epoch_sync(
1795            &mut self,
1796            epoch: HummockEpoch,
1797            sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1798            table_ids: HashSet<TableId>,
1799        ) {
1800            self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]);
1801        }
1802    }
1803
1804    #[tokio::test]
1805    pub async fn test_uploading_task_future() {
1806        let uploader_context = test_uploader_context(dummy_success_upload_future);
1807
1808        let imm = gen_imm(INITIAL_EPOCH).await;
1809        let imm_size = imm.size();
1810        let imm_ids = get_imm_ids(vec![&imm]);
1811        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1812        assert_eq!(imm_size, task.task_info.task_size);
1813        assert_eq!(imm_ids, task.task_info.imm_ids);
1814        assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs);
1815        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1816        assert_eq!(
1817            output.sstable_infos(),
1818            &dummy_success_upload_output().new_value_ssts
1819        );
1820        assert_eq!(imm_size, output.imm_size());
1821        assert_eq!(&imm_ids, output.imm_ids());
1822        assert_eq!(&vec![INITIAL_EPOCH], output.epochs());
1823
1824        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1825        let imm = gen_imm(INITIAL_EPOCH).await;
1826        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1827        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1828    }
1829
1830    #[tokio::test]
1831    pub async fn test_uploading_task_poll_result() {
1832        let uploader_context = test_uploader_context(dummy_success_upload_future);
1833        let mut task =
1834            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1835        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1836        assert_eq!(
1837            output.sstable_infos(),
1838            &dummy_success_upload_output().new_value_ssts
1839        );
1840
1841        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1842        let mut task =
1843            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1844        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1845    }
1846
1847    #[tokio::test]
1848    async fn test_uploading_task_poll_ok_with_retry() {
1849        let run_count = Arc::new(AtomicUsize::new(0));
1850        let fail_num = 10;
1851        let run_count_clone = run_count.clone();
1852        let uploader_context = test_uploader_context(move |payload, info| {
1853            let run_count = run_count.clone();
1854            async move {
1855                // fail in the first `fail_num` run, and success at the end
1856                let ret = if run_count.load(SeqCst) < fail_num {
1857                    Err(HummockError::other("fail"))
1858                } else {
1859                    dummy_success_upload_future(payload, info).await
1860                };
1861                run_count.fetch_add(1, SeqCst);
1862                ret
1863            }
1864        });
1865        let mut task =
1866            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1867        let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await;
1868        assert_eq!(fail_num + 1, run_count_clone.load(SeqCst));
1869        assert_eq!(
1870            output.sstable_infos(),
1871            &dummy_success_upload_output().new_value_ssts
1872        );
1873    }
1874
1875    #[tokio::test]
1876    async fn test_uploader_basic() {
1877        let mut uploader = test_uploader(dummy_success_upload_future);
1878        let epoch1 = INITIAL_EPOCH.next_epoch();
1879        const VECTOR_INDEX_TABLE_ID: TableId = TableId::new(234);
1880        uploader.start_epoch(
1881            epoch1,
1882            HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1883        );
1884        let imm = gen_imm(epoch1).await;
1885        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1886        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1887        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1888
1889        uploader.register_vector_writer(VECTOR_INDEX_TABLE_ID, epoch1);
1890        let vector_info_file = VectorFileInfo {
1891            object_id: 1.into(),
1892            vector_count: 1,
1893            file_size: 0,
1894            start_vector_id: 0,
1895            meta_offset: 20,
1896        };
1897        let vector_index_add = VectorIndexAdd::Flat(FlatIndexAdd {
1898            vector_store_info_delta: VectorStoreInfoDelta {
1899                next_vector_id: 1,
1900                added_vector_files: vec![vector_info_file.clone()],
1901            },
1902        });
1903        uploader.vector_writer_seal_epoch(
1904            VECTOR_INDEX_TABLE_ID,
1905            epoch1.next_epoch(),
1906            Some(vector_index_add.clone()),
1907        );
1908
1909        let (sync_tx, sync_rx) = oneshot::channel();
1910        uploader.start_single_epoch_sync(
1911            epoch1,
1912            sync_tx,
1913            HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1914        );
1915        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1916        assert_eq!(1, uploader.data().syncing_data.len());
1917        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1918        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1919        assert!(syncing_data.uploaded.is_empty());
1920        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1921
1922        let staging_sst = uploader.next_uploaded_sst().await;
1923        assert_eq!(&vec![epoch1], staging_sst.epochs());
1924        assert_eq!(
1925            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1926            staging_sst.imm_ids()
1927        );
1928        assert_eq!(
1929            &dummy_success_upload_output().new_value_ssts,
1930            staging_sst.sstable_infos()
1931        );
1932
1933        match sync_rx.await {
1934            Ok(Ok(data)) => {
1935                let SyncedData {
1936                    uploaded_ssts,
1937                    table_watermarks,
1938                    vector_index_adds,
1939                } = data;
1940                assert_eq!(1, uploaded_ssts.len());
1941                let staging_sst = &uploaded_ssts[0];
1942                assert_eq!(&vec![epoch1], staging_sst.epochs());
1943                assert_eq!(
1944                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1945                    staging_sst.imm_ids()
1946                );
1947                assert_eq!(
1948                    &dummy_success_upload_output().new_value_ssts,
1949                    staging_sst.sstable_infos()
1950                );
1951                assert!(table_watermarks.is_empty());
1952                assert_eq!(vector_index_adds.len(), 1);
1953                let (table_id, vector_index_adds) = vector_index_adds.into_iter().next().unwrap();
1954                assert_eq!(table_id, VECTOR_INDEX_TABLE_ID);
1955                assert_eq!(vector_index_adds.len(), 1);
1956                let synced_vector_index_add = vector_index_adds[0].clone();
1957                assert_eq!(vector_index_add, synced_vector_index_add);
1958            }
1959            _ => unreachable!(),
1960        };
1961        assert_eq!(epoch1, uploader.test_max_synced_epoch());
1962
1963        let new_pinned_version = uploader
1964            .context
1965            .pinned_version
1966            .new_pin_version(test_hummock_version(epoch1))
1967            .unwrap();
1968        uploader.update_pinned_version(new_pinned_version);
1969        assert_eq!(
1970            epoch1,
1971            uploader
1972                .context
1973                .pinned_version
1974                .table_committed_epoch(TEST_TABLE_ID)
1975                .unwrap()
1976        );
1977    }
1978
1979    #[tokio::test]
1980    async fn test_uploader_destroy_instance_before_sync() {
1981        let mut uploader = test_uploader(dummy_success_upload_future);
1982        let epoch1 = INITIAL_EPOCH.next_epoch();
1983        uploader.start_epochs_for_test([epoch1]);
1984        let imm = gen_imm(epoch1).await;
1985        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1986        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1987        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1988        uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);
1989
1990        let (sync_tx, sync_rx) = oneshot::channel();
1991        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1992        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1993        assert_eq!(1, uploader.data().syncing_data.len());
1994        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1995        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1996        assert!(syncing_data.uploaded.is_empty());
1997        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1998
1999        let staging_sst = uploader.next_uploaded_sst().await;
2000        assert_eq!(&vec![epoch1], staging_sst.epochs());
2001        assert_eq!(
2002            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2003            staging_sst.imm_ids()
2004        );
2005        assert_eq!(
2006            &dummy_success_upload_output().new_value_ssts,
2007            staging_sst.sstable_infos()
2008        );
2009
2010        match sync_rx.await {
2011            Ok(Ok(data)) => {
2012                let SyncedData {
2013                    uploaded_ssts,
2014                    table_watermarks,
2015                    ..
2016                } = data;
2017                assert_eq!(1, uploaded_ssts.len());
2018                let staging_sst = &uploaded_ssts[0];
2019                assert_eq!(&vec![epoch1], staging_sst.epochs());
2020                assert_eq!(
2021                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2022                    staging_sst.imm_ids()
2023                );
2024                assert_eq!(
2025                    &dummy_success_upload_output().new_value_ssts,
2026                    staging_sst.sstable_infos()
2027                );
2028                assert!(table_watermarks.is_empty());
2029            }
2030            _ => unreachable!(),
2031        };
2032        assert!(
2033            !uploader
2034                .data()
2035                .unsync_data
2036                .table_data
2037                .contains_key(&TEST_TABLE_ID)
2038        );
2039    }
2040
2041    #[tokio::test]
2042    async fn test_empty_uploader_sync() {
2043        let mut uploader = test_uploader(dummy_success_upload_future);
2044        let epoch1 = INITIAL_EPOCH.next_epoch();
2045
2046        let (sync_tx, sync_rx) = oneshot::channel();
2047        uploader.start_epochs_for_test([epoch1]);
2048        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2049        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2050        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2051        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2052
2053        assert_uploader_pending(&mut uploader).await;
2054
2055        match sync_rx.await {
2056            Ok(Ok(data)) => {
2057                assert!(data.uploaded_ssts.is_empty());
2058            }
2059            _ => unreachable!(),
2060        };
2061        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2062        let new_pinned_version = uploader
2063            .context
2064            .pinned_version
2065            .new_pin_version(test_hummock_version(epoch1))
2066            .unwrap();
2067        uploader.update_pinned_version(new_pinned_version);
2068        assert!(uploader.data().syncing_data.is_empty());
2069        assert_eq!(
2070            epoch1,
2071            uploader
2072                .context
2073                .pinned_version
2074                .table_committed_epoch(TEST_TABLE_ID)
2075                .unwrap()
2076        );
2077    }
2078
2079    #[tokio::test]
2080    async fn test_uploader_empty_epoch() {
2081        let mut uploader = test_uploader(dummy_success_upload_future);
2082        let epoch1 = INITIAL_EPOCH.next_epoch();
2083        let epoch2 = epoch1.next_epoch();
2084        uploader.start_epochs_for_test([epoch1, epoch2]);
2085        let imm = gen_imm(epoch2).await;
2086        // epoch1 is empty while epoch2 is not. Going to seal empty epoch1.
2087        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2088        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2089        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm);
2090
2091        let (sync_tx, sync_rx) = oneshot::channel();
2092        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2093        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2094
2095        assert_uploader_pending(&mut uploader).await;
2096
2097        match sync_rx.await {
2098            Ok(Ok(data)) => {
2099                assert!(data.uploaded_ssts.is_empty());
2100            }
2101            _ => unreachable!(),
2102        };
2103        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2104        let new_pinned_version = uploader
2105            .context
2106            .pinned_version
2107            .new_pin_version(test_hummock_version(epoch1))
2108            .unwrap();
2109        uploader.update_pinned_version(new_pinned_version);
2110        assert!(uploader.data().syncing_data.is_empty());
2111        assert_eq!(
2112            epoch1,
2113            uploader
2114                .context
2115                .pinned_version
2116                .table_committed_epoch(TEST_TABLE_ID)
2117                .unwrap()
2118        );
2119    }
2120
2121    #[tokio::test]
2122    async fn test_uploader_poll_empty() {
2123        let mut uploader = test_uploader(dummy_success_upload_future);
2124        let fut = uploader.next_uploaded_sst();
2125        let mut fut = pin!(fut);
2126        assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await);
2127    }
2128
2129    #[tokio::test]
2130    async fn test_uploader_empty_advance_mce() {
2131        let mut uploader = test_uploader(dummy_success_upload_future);
2132        let initial_pinned_version = uploader.context.pinned_version.clone();
2133        let epoch1 = INITIAL_EPOCH.next_epoch();
2134        let epoch2 = epoch1.next_epoch();
2135        let epoch3 = epoch2.next_epoch();
2136        let epoch4 = epoch3.next_epoch();
2137        let epoch5 = epoch4.next_epoch();
2138        let epoch6 = epoch5.next_epoch();
2139        let version1 = initial_pinned_version
2140            .new_pin_version(test_hummock_version(epoch1))
2141            .unwrap();
2142        let version2 = initial_pinned_version
2143            .new_pin_version(test_hummock_version(epoch2))
2144            .unwrap();
2145        let version3 = initial_pinned_version
2146            .new_pin_version(test_hummock_version(epoch3))
2147            .unwrap();
2148        let version4 = initial_pinned_version
2149            .new_pin_version(test_hummock_version(epoch4))
2150            .unwrap();
2151        let version5 = initial_pinned_version
2152            .new_pin_version(test_hummock_version(epoch5))
2153            .unwrap();
2154
2155        uploader.start_epochs_for_test([epoch6]);
2156        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
2157
2158        uploader.update_pinned_version(version1);
2159        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2160        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2161
2162        let imm = gen_imm(epoch6).await;
2163        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
2164        uploader.update_pinned_version(version2);
2165        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2166        assert_eq!(epoch2, uploader.test_max_syncing_epoch());
2167
2168        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6);
2169        uploader.update_pinned_version(version3);
2170        assert_eq!(epoch3, uploader.test_max_synced_epoch());
2171        assert_eq!(epoch3, uploader.test_max_syncing_epoch());
2172
2173        let (sync_tx, sync_rx) = oneshot::channel();
2174        uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2175        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2176        uploader.update_pinned_version(version4);
2177        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2178        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2179
2180        let sst = uploader.next_uploaded_sst().await;
2181        assert_eq!(&get_imm_ids([&imm]), sst.imm_ids());
2182
2183        match sync_rx.await {
2184            Ok(Ok(data)) => {
2185                assert!(data.table_watermarks.is_empty());
2186                assert_eq!(1, data.uploaded_ssts.len());
2187                assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids());
2188            }
2189            _ => unreachable!(),
2190        }
2191
2192        uploader.update_pinned_version(version5);
2193        assert_eq!(epoch6, uploader.test_max_synced_epoch());
2194        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2195    }
2196
2197    #[tokio::test]
2198    async fn test_uploader_finish_in_order() {
2199        let config = StorageOpts {
2200            shared_buffer_capacity_mb: 1024 * 1024,
2201            shared_buffer_flush_ratio: 0.0,
2202            ..Default::default()
2203        };
2204        let (buffer_tracker, mut uploader, new_task_notifier) =
2205            prepare_uploader_order_test(&config, false);
2206
2207        let epoch1 = INITIAL_EPOCH.next_epoch();
2208        let epoch2 = epoch1.next_epoch();
2209        let epoch3 = epoch2.next_epoch();
2210        let epoch4 = epoch3.next_epoch();
2211        uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]);
2212        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2213        let memory_limiter = Some(memory_limiter.deref());
2214
2215        let instance_id1 = 1;
2216        let instance_id2 = 2;
2217
2218        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2219        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2220
2221        // imm2 contains data in newer epoch, but added first
2222        let imm2 = gen_imm_with_limiter(epoch2, memory_limiter).await;
2223        uploader.add_imm(instance_id2, imm2.clone());
2224        let imm1_1 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2225        uploader.add_imm(instance_id1, imm1_1.clone());
2226        let imm1_2 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2227        uploader.add_imm(instance_id1, imm1_2.clone());
2228
2229        // imm1 will be spilled first
2230        let epoch1_spill_payload12 =
2231            HashMap::from_iter([(instance_id1, vec![imm1_2.clone(), imm1_1.clone()])]);
2232        let epoch2_spill_payload = HashMap::from_iter([(instance_id2, vec![imm2.clone()])]);
2233        let (await_start1, finish_tx1) =
2234            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload12));
2235        let (await_start2, finish_tx2) =
2236            new_task_notifier(get_payload_imm_ids(&epoch2_spill_payload));
2237        uploader.may_flush();
2238        await_start1.await;
2239        await_start2.await;
2240
2241        assert_uploader_pending(&mut uploader).await;
2242
2243        finish_tx2.send(()).unwrap();
2244        assert_uploader_pending(&mut uploader).await;
2245
2246        finish_tx1.send(()).unwrap();
2247        let sst = uploader.next_uploaded_sst().await;
2248        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids());
2249        assert_eq!(&vec![epoch1], sst.epochs());
2250
2251        let sst = uploader.next_uploaded_sst().await;
2252        assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids());
2253        assert_eq!(&vec![epoch2], sst.epochs());
2254
2255        let imm1_3 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2256        uploader.add_imm(instance_id1, imm1_3.clone());
2257        let epoch1_spill_payload3 = HashMap::from_iter([(instance_id1, vec![imm1_3.clone()])]);
2258        let (await_start1_3, finish_tx1_3) =
2259            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload3));
2260        uploader.may_flush();
2261        await_start1_3.await;
2262        let imm1_4 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2263        uploader.add_imm(instance_id1, imm1_4.clone());
2264        let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]);
2265        let (await_start1_4, finish_tx1_4) =
2266            new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload));
2267        uploader.local_seal_epoch_for_test(instance_id1, epoch1);
2268        let (sync_tx1, mut sync_rx1) = oneshot::channel();
2269        uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID]));
2270        await_start1_4.await;
2271
2272        uploader.local_seal_epoch_for_test(instance_id1, epoch2);
2273        uploader.local_seal_epoch_for_test(instance_id2, epoch2);
2274
2275        // current uploader state:
2276        // unsealed: empty
2277        // sealed: epoch2: uploaded sst([imm2])
2278        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2279
2280        let imm3_1 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2281        let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]);
2282        uploader.add_imm(instance_id1, imm3_1.clone());
2283        let (await_start3_1, finish_tx3_1) =
2284            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload1));
2285        uploader.may_flush();
2286        await_start3_1.await;
2287        let imm3_2 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2288        let epoch3_spill_payload2 = HashMap::from_iter([(instance_id2, vec![imm3_2.clone()])]);
2289        uploader.add_imm(instance_id2, imm3_2.clone());
2290        let (await_start3_2, finish_tx3_2) =
2291            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload2));
2292        uploader.may_flush();
2293        await_start3_2.await;
2294        let imm3_3 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2295        uploader.add_imm(instance_id1, imm3_3.clone());
2296
2297        // current uploader state:
2298        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2299        // sealed: uploaded sst([imm2])
2300        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2301
2302        uploader.local_seal_epoch_for_test(instance_id1, epoch3);
2303        let imm4 = gen_imm_with_limiter(epoch4, memory_limiter).await;
2304        uploader.add_imm(instance_id1, imm4.clone());
2305        assert_uploader_pending(&mut uploader).await;
2306
2307        // current uploader state:
2308        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2309        //           epoch4: imm: imm4
2310        // sealed: uploaded sst([imm2])
2311        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2312
2313        finish_tx3_1.send(()).unwrap();
2314        assert_uploader_pending(&mut uploader).await;
2315        finish_tx1_4.send(()).unwrap();
2316        assert_uploader_pending(&mut uploader).await;
2317        finish_tx1_3.send(()).unwrap();
2318
2319        let sst = uploader.next_uploaded_sst().await;
2320        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids());
2321
2322        assert!(poll_fn(|cx| Poll::Ready(sync_rx1.poll_unpin(cx).is_pending())).await);
2323
2324        let sst = uploader.next_uploaded_sst().await;
2325        assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids());
2326
2327        match sync_rx1.await {
2328            Ok(Ok(data)) => {
2329                assert_eq!(3, data.uploaded_ssts.len());
2330                assert_eq!(
2331                    &get_payload_imm_ids(&epoch1_sync_payload),
2332                    data.uploaded_ssts[0].imm_ids()
2333                );
2334                assert_eq!(
2335                    &get_payload_imm_ids(&epoch1_spill_payload3),
2336                    data.uploaded_ssts[1].imm_ids()
2337                );
2338                assert_eq!(
2339                    &get_payload_imm_ids(&epoch1_spill_payload12),
2340                    data.uploaded_ssts[2].imm_ids()
2341                );
2342            }
2343            _ => {
2344                unreachable!()
2345            }
2346        }
2347
2348        // current uploader state:
2349        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2350        //           epoch4: imm: imm4
2351        // sealed: uploaded sst([imm2])
2352        // syncing: empty
2353        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2354
2355        let (sync_tx2, sync_rx2) = oneshot::channel();
2356        uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID]));
2357        uploader.local_seal_epoch_for_test(instance_id2, epoch3);
2358        let sst = uploader.next_uploaded_sst().await;
2359        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids());
2360
2361        match sync_rx2.await {
2362            Ok(Ok(data)) => {
2363                assert_eq!(data.uploaded_ssts.len(), 1);
2364                assert_eq!(
2365                    &get_payload_imm_ids(&epoch2_spill_payload),
2366                    data.uploaded_ssts[0].imm_ids()
2367                );
2368            }
2369            _ => {
2370                unreachable!("should be sync finish");
2371            }
2372        }
2373        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2374
2375        // current uploader state:
2376        // unsealed: epoch4: imm: imm4
2377        // sealed: imm: imm3_3, uploading: [imm3_2], uploaded: sst([imm3_1])
2378        // syncing: empty
2379        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2380        //         epoch2: sst([imm2])
2381
2382        uploader.local_seal_epoch_for_test(instance_id1, epoch4);
2383        uploader.local_seal_epoch_for_test(instance_id2, epoch4);
2384        let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]);
2385        let (await_start4_with_3_3, finish_tx4_with_3_3) =
2386            new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload));
2387        let (sync_tx4, mut sync_rx4) = oneshot::channel();
2388        uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID]));
2389        await_start4_with_3_3.await;
2390
2391        // current uploader state:
2392        // unsealed: empty
2393        // sealed: empty
2394        // syncing: epoch4: uploading: [imm4, imm3_3], [imm3_2], uploaded: sst([imm3_1])
2395        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2396        //         epoch2: sst([imm2])
2397
2398        assert_uploader_pending(&mut uploader).await;
2399
2400        finish_tx3_2.send(()).unwrap();
2401        let sst = uploader.next_uploaded_sst().await;
2402        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids());
2403
2404        finish_tx4_with_3_3.send(()).unwrap();
2405        assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await);
2406
2407        let sst = uploader.next_uploaded_sst().await;
2408        assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids());
2409
2410        match sync_rx4.await {
2411            Ok(Ok(data)) => {
2412                assert_eq!(3, data.uploaded_ssts.len());
2413                assert_eq!(
2414                    &get_payload_imm_ids(&epoch4_sync_payload),
2415                    data.uploaded_ssts[0].imm_ids()
2416                );
2417                assert_eq!(
2418                    &get_payload_imm_ids(&epoch3_spill_payload2),
2419                    data.uploaded_ssts[1].imm_ids()
2420                );
2421                assert_eq!(
2422                    &get_payload_imm_ids(&epoch3_spill_payload1),
2423                    data.uploaded_ssts[2].imm_ids(),
2424                )
2425            }
2426            _ => {
2427                unreachable!("should be sync finish");
2428            }
2429        }
2430        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2431
2432        // current uploader state:
2433        // unsealed: empty
2434        // sealed: empty
2435        // syncing: empty
2436        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2437        //         epoch2: sst([imm2])
2438        //         epoch4: sst([imm4, imm3_3]), sst([imm3_2]), sst([imm3_1])
2439    }
2440
2441    #[tokio::test]
2442    async fn test_uploader_frequently_flush() {
2443        let config = StorageOpts {
2444            shared_buffer_capacity_mb: 10,
2445            shared_buffer_flush_ratio: 0.12,
2446            // This test will fail when we set it to 0
2447            shared_buffer_min_batch_flush_size_mb: 1,
2448            ..Default::default()
2449        };
2450        let (buffer_tracker, mut uploader, _new_task_notifier) =
2451            prepare_uploader_order_test(&config, true);
2452
2453        let epoch1 = INITIAL_EPOCH.next_epoch();
2454        let epoch2 = epoch1.next_epoch();
2455        uploader.start_epochs_for_test([epoch1, epoch2]);
2456        let instance_id1 = 1;
2457        let instance_id2 = 2;
2458        let flush_threshold = buffer_tracker.flush_threshold();
2459        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2460
2461        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2462        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2463
2464        // imm2 contains data in newer epoch, but added first
2465        let mut total_memory = 0;
2466        while total_memory < flush_threshold {
2467            let imm = gen_imm_with_limiter(epoch2, Some(memory_limiter.as_ref())).await;
2468            total_memory += imm.size();
2469            if total_memory > flush_threshold {
2470                break;
2471            }
2472            uploader.add_imm(instance_id2, imm);
2473        }
2474        let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2475        uploader.add_imm(instance_id1, imm);
2476        assert!(uploader.may_flush());
2477
2478        for _ in 0..10 {
2479            let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2480            uploader.add_imm(instance_id1, imm);
2481            assert!(!uploader.may_flush());
2482        }
2483    }
2484}