risingwave_storage/hummock/event_handler/uploader/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod spiller;
16mod task_manager;
17pub(crate) mod test_utils;
18
19use std::cmp::Ordering;
20use std::collections::btree_map::Entry;
21use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque, hash_map};
22use std::fmt::{Debug, Display, Formatter};
23use std::future::{Future, poll_fn};
24use std::mem::{replace, swap, take};
25use std::sync::Arc;
26use std::task::{Context, Poll, ready};
27
28use futures::FutureExt;
29use itertools::Itertools;
30use more_asserts::assert_gt;
31use prometheus::{HistogramTimer, IntGauge};
32use risingwave_common::bitmap::BitmapBuilder;
33use risingwave_common::catalog::TableId;
34use risingwave_common::metrics::UintGauge;
35use risingwave_common::must_match;
36use risingwave_hummock_sdk::table_watermark::{
37    TableWatermarks, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
40use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId, LocalSstableInfo};
41use task_manager::{TaskManager, UploadingTaskStatus};
42use thiserror_ext::AsReport;
43use tokio::sync::oneshot;
44use tokio::task::JoinHandle;
45use tracing::{debug, error, info, warn};
46
47use crate::hummock::event_handler::LocalInstanceId;
48use crate::hummock::event_handler::hummock_event_handler::{BufferTracker, send_sync_result};
49use crate::hummock::event_handler::uploader::spiller::Spiller;
50use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm;
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchId;
53use crate::hummock::store::version::StagingSstableInfo;
54use crate::hummock::{HummockError, HummockResult, ImmutableMemtable};
55use crate::mem_table::ImmId;
56use crate::monitor::HummockStateStoreMetrics;
57use crate::store::SealCurrentEpochOptions;
58
59/// Take epoch data inclusively before `epoch` out from `data`
60fn take_before_epoch<T>(
61    data: &mut BTreeMap<HummockEpoch, T>,
62    epoch: HummockEpoch,
63) -> BTreeMap<HummockEpoch, T> {
64    let mut before_epoch_data = data.split_off(&(epoch + 1));
65    swap(&mut before_epoch_data, data);
66    before_epoch_data
67}
68
69type UploadTaskInput = HashMap<LocalInstanceId, Vec<UploaderImm>>;
70pub type UploadTaskPayload = HashMap<LocalInstanceId, Vec<ImmutableMemtable>>;
71
72#[derive(Debug)]
73pub struct UploadTaskOutput {
74    pub new_value_ssts: Vec<LocalSstableInfo>,
75    pub old_value_ssts: Vec<LocalSstableInfo>,
76    pub wait_poll_timer: Option<HistogramTimer>,
77}
78pub type SpawnUploadTask = Arc<
79    dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle<HummockResult<UploadTaskOutput>>
80        + Send
81        + Sync
82        + 'static,
83>;
84
85#[derive(Clone)]
86pub struct UploadTaskInfo {
87    pub task_size: usize,
88    pub epochs: Vec<HummockEpoch>,
89    pub imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
90}
91
92impl Display for UploadTaskInfo {
93    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94        f.debug_struct("UploadTaskInfo")
95            .field("task_size", &self.task_size)
96            .field("epochs", &self.epochs)
97            .field("len(imm_ids)", &self.imm_ids.len())
98            .finish()
99    }
100}
101
102impl Debug for UploadTaskInfo {
103    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("UploadTaskInfo")
105            .field("task_size", &self.task_size)
106            .field("epochs", &self.epochs)
107            .field("imm_ids", &self.imm_ids)
108            .finish()
109    }
110}
111
112mod uploader_imm {
113    use std::fmt::Formatter;
114    use std::ops::Deref;
115
116    use risingwave_common::metrics::UintGauge;
117
118    use crate::hummock::event_handler::uploader::UploaderContext;
119    use crate::mem_table::ImmutableMemtable;
120
121    pub(super) struct UploaderImm {
122        inner: ImmutableMemtable,
123        size_guard: UintGauge,
124    }
125
126    impl UploaderImm {
127        pub(super) fn new(imm: ImmutableMemtable, context: &UploaderContext) -> Self {
128            let size = imm.size();
129            let size_guard = context.stats.uploader_imm_size.clone();
130            size_guard.add(size as _);
131            Self {
132                inner: imm,
133                size_guard,
134            }
135        }
136
137        #[cfg(test)]
138        pub(super) fn for_test(imm: ImmutableMemtable) -> Self {
139            Self {
140                inner: imm,
141                size_guard: UintGauge::new("test", "test").unwrap(),
142            }
143        }
144    }
145
146    impl std::fmt::Debug for UploaderImm {
147        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
148            self.inner.fmt(f)
149        }
150    }
151
152    impl Deref for UploaderImm {
153        type Target = ImmutableMemtable;
154
155        fn deref(&self) -> &Self::Target {
156            &self.inner
157        }
158    }
159
160    impl Drop for UploaderImm {
161        fn drop(&mut self) {
162            self.size_guard.sub(self.inner.size() as _);
163        }
164    }
165}
166
167#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
168struct UploadingTaskId(usize);
169
170/// A wrapper for a uploading task that compacts and uploads the imm payload. Task context are
171/// stored so that when the task fails, it can be re-tried.
172struct UploadingTask {
173    task_id: UploadingTaskId,
174    // newer data at the front
175    input: UploadTaskInput,
176    join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
177    task_info: UploadTaskInfo,
178    spawn_upload_task: SpawnUploadTask,
179    task_size_guard: UintGauge,
180    task_count_guard: IntGauge,
181}
182
183impl Drop for UploadingTask {
184    fn drop(&mut self) {
185        self.task_size_guard.sub(self.task_info.task_size as u64);
186        self.task_count_guard.dec();
187    }
188}
189
190impl Debug for UploadingTask {
191    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
192        f.debug_struct("UploadingTask")
193            .field("input", &self.input)
194            .field("task_info", &self.task_info)
195            .finish()
196    }
197}
198
199fn get_payload_imm_ids(
200    payload: &UploadTaskPayload,
201) -> HashMap<LocalInstanceId, Vec<SharedBufferBatchId>> {
202    payload
203        .iter()
204        .map(|(instance_id, imms)| {
205            (
206                *instance_id,
207                imms.iter().map(|imm| imm.batch_id()).collect_vec(),
208            )
209        })
210        .collect()
211}
212
213impl UploadingTask {
214    // INFO logs will be enabled for task with size exceeding 50MB.
215    const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20);
216
217    fn input_to_payload(input: &UploadTaskInput) -> UploadTaskPayload {
218        input
219            .iter()
220            .map(|(instance_id, imms)| {
221                (
222                    *instance_id,
223                    imms.iter().map(|imm| (**imm).clone()).collect(),
224                )
225            })
226            .collect()
227    }
228
229    fn new(task_id: UploadingTaskId, input: UploadTaskInput, context: &UploaderContext) -> Self {
230        assert!(!input.is_empty());
231        let mut epochs = input
232            .iter()
233            .flat_map(|(_, imms)| imms.iter().flat_map(|imm| imm.epochs().iter().cloned()))
234            .sorted()
235            .dedup()
236            .collect_vec();
237
238        // reverse to make newer epochs comes first
239        epochs.reverse();
240        let payload = Self::input_to_payload(&input);
241        let imm_ids = get_payload_imm_ids(&payload);
242        let task_size = input
243            .values()
244            .map(|imms| imms.iter().map(|imm| imm.size()).sum::<usize>())
245            .sum();
246        let task_info = UploadTaskInfo {
247            task_size,
248            epochs,
249            imm_ids,
250        };
251        context
252            .buffer_tracker
253            .global_upload_task_size()
254            .add(task_size as u64);
255        if task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
256            info!("start upload task: {:?}", task_info);
257        } else {
258            debug!("start upload task: {:?}", task_info);
259        }
260        let join_handle = (context.spawn_upload_task)(payload, task_info.clone());
261        context.stats.uploader_uploading_task_count.inc();
262        Self {
263            task_id,
264            input,
265            join_handle,
266            task_info,
267            spawn_upload_task: context.spawn_upload_task.clone(),
268            task_size_guard: context.buffer_tracker.global_upload_task_size().clone(),
269            task_count_guard: context.stats.uploader_uploading_task_count.clone(),
270        }
271    }
272
273    /// Poll the result of the uploading task
274    fn poll_result(
275        &mut self,
276        cx: &mut Context<'_>,
277    ) -> Poll<HummockResult<Arc<StagingSstableInfo>>> {
278        Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) {
279            Ok(task_result) => task_result
280                .inspect(|_| {
281                    if self.task_info.task_size > Self::LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE {
282                        info!(task_info = ?self.task_info, "upload task finish");
283                    } else {
284                        debug!(task_info = ?self.task_info, "upload task finish");
285                    }
286                })
287                .inspect_err(|e| error!(task_info = ?self.task_info, err = ?e.as_report(), "upload task failed"))
288                .map(|output| {
289                    Arc::new(StagingSstableInfo::new(
290                        output.new_value_ssts,
291                        output.old_value_ssts,
292                        self.task_info.epochs.clone(),
293                        self.task_info.imm_ids.clone(),
294                        self.task_info.task_size,
295                    ))
296                }),
297
298            Err(err) => Err(HummockError::other(format!(
299                "fail to join upload join handle: {}",
300                err.as_report()
301            ))),
302        })
303    }
304
305    /// Poll the uploading task until it succeeds. If it fails, we will retry it.
306    fn poll_ok_with_retry(&mut self, cx: &mut Context<'_>) -> Poll<Arc<StagingSstableInfo>> {
307        loop {
308            let result = ready!(self.poll_result(cx));
309            match result {
310                Ok(sstables) => return Poll::Ready(sstables),
311                Err(e) => {
312                    error!(
313                        error = %e.as_report(),
314                        task_info = ?self.task_info,
315                        "a flush task failed, start retry",
316                    );
317                    self.join_handle = (self.spawn_upload_task)(
318                        Self::input_to_payload(&self.input),
319                        self.task_info.clone(),
320                    );
321                    // It is important not to return Poll::pending here immediately, because the new
322                    // join_handle is not polled yet, and will not awake the current task when
323                    // succeed. It will be polled in the next loop iteration.
324                }
325            }
326        }
327    }
328
329    pub fn get_task_info(&self) -> &UploadTaskInfo {
330        &self.task_info
331    }
332}
333
334impl TableUnsyncData {
335    fn add_table_watermarks(
336        &mut self,
337        epoch: HummockEpoch,
338        table_watermarks: Vec<VnodeWatermark>,
339        direction: WatermarkDirection,
340        watermark_type: WatermarkSerdeType,
341    ) {
342        if table_watermarks.is_empty() {
343            return;
344        }
345        let vnode_count = table_watermarks[0].vnode_count();
346        for watermark in &table_watermarks {
347            assert_eq!(vnode_count, watermark.vnode_count());
348        }
349
350        fn apply_new_vnodes(
351            vnode_bitmap: &mut BitmapBuilder,
352            vnode_watermarks: &Vec<VnodeWatermark>,
353        ) {
354            for vnode_watermark in vnode_watermarks {
355                for vnode in vnode_watermark.vnode_bitmap().iter_ones() {
356                    assert!(
357                        !vnode_bitmap.is_set(vnode),
358                        "vnode {} write multiple table watermarks",
359                        vnode
360                    );
361                    vnode_bitmap.set(vnode, true);
362                }
363            }
364        }
365        match &mut self.table_watermarks {
366            Some((prev_direction, prev_watermarks, prev_watermark_type)) => {
367                assert_eq!(
368                    *prev_direction, direction,
369                    "table id {} new watermark direction not match with previous",
370                    self.table_id
371                );
372                assert_eq!(
373                    *prev_watermark_type, watermark_type,
374                    "table id {} new watermark watermark_type not match with previous",
375                    self.table_id
376                );
377                match prev_watermarks.entry(epoch) {
378                    Entry::Occupied(mut entry) => {
379                        let (prev_watermarks, vnode_bitmap) = entry.get_mut();
380                        apply_new_vnodes(vnode_bitmap, &table_watermarks);
381                        prev_watermarks.extend(table_watermarks);
382                    }
383                    Entry::Vacant(entry) => {
384                        let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
385                        apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
386                        entry.insert((table_watermarks, vnode_bitmap));
387                    }
388                }
389            }
390            None => {
391                let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count);
392                apply_new_vnodes(&mut vnode_bitmap, &table_watermarks);
393                self.table_watermarks = Some((
394                    direction,
395                    BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]),
396                    watermark_type,
397                ));
398            }
399        }
400    }
401}
402
403impl UploaderData {
404    fn add_table_watermarks(
405        all_table_watermarks: &mut HashMap<TableId, TableWatermarks>,
406        table_id: TableId,
407        direction: WatermarkDirection,
408        watermarks: impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)>,
409        watermark_type: WatermarkSerdeType,
410    ) {
411        let mut table_watermarks: Option<TableWatermarks> = None;
412        for (epoch, watermarks) in watermarks {
413            match &mut table_watermarks {
414                Some(prev_watermarks) => {
415                    assert_eq!(prev_watermarks.direction, direction);
416                    assert_eq!(prev_watermarks.watermark_type, watermark_type);
417                    prev_watermarks.add_new_epoch_watermarks(
418                        epoch,
419                        Arc::from(watermarks),
420                        direction,
421                        watermark_type,
422                    );
423                }
424                None => {
425                    table_watermarks = Some(TableWatermarks::single_epoch(
426                        epoch,
427                        watermarks,
428                        direction,
429                        watermark_type,
430                    ));
431                }
432            }
433        }
434        if let Some(table_watermarks) = table_watermarks {
435            assert!(
436                all_table_watermarks
437                    .insert(table_id, table_watermarks)
438                    .is_none()
439            );
440        }
441    }
442}
443
444struct LocalInstanceEpochData {
445    epoch: HummockEpoch,
446    // newer data comes first.
447    imms: VecDeque<UploaderImm>,
448    has_spilled: bool,
449}
450
451impl LocalInstanceEpochData {
452    fn new(epoch: HummockEpoch) -> Self {
453        Self {
454            epoch,
455            imms: VecDeque::new(),
456            has_spilled: false,
457        }
458    }
459
460    fn epoch(&self) -> HummockEpoch {
461        self.epoch
462    }
463
464    fn add_imm(&mut self, imm: UploaderImm) {
465        assert_eq!(imm.max_epoch(), imm.min_epoch());
466        assert_eq!(self.epoch, imm.min_epoch());
467        if let Some(prev_imm) = self.imms.front() {
468            assert_gt!(imm.batch_id(), prev_imm.batch_id());
469        }
470        self.imms.push_front(imm);
471    }
472
473    fn is_empty(&self) -> bool {
474        self.imms.is_empty()
475    }
476}
477
478struct LocalInstanceUnsyncData {
479    table_id: TableId,
480    instance_id: LocalInstanceId,
481    // None means that the current instance should have stopped advancing
482    current_epoch_data: Option<LocalInstanceEpochData>,
483    // newer data comes first.
484    sealed_data: VecDeque<LocalInstanceEpochData>,
485    // newer data comes first
486    flushing_imms: VecDeque<SharedBufferBatchId>,
487    is_destroyed: bool,
488}
489
490impl LocalInstanceUnsyncData {
491    fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self {
492        Self {
493            table_id,
494            instance_id,
495            current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)),
496            sealed_data: VecDeque::new(),
497            flushing_imms: Default::default(),
498            is_destroyed: false,
499        }
500    }
501
502    fn add_imm(&mut self, imm: UploaderImm) {
503        assert!(!self.is_destroyed);
504        assert_eq!(self.table_id, imm.table_id);
505        self.current_epoch_data
506            .as_mut()
507            .expect("should be Some when adding new imm")
508            .add_imm(imm);
509    }
510
511    fn local_seal_epoch(&mut self, next_epoch: HummockEpoch) -> HummockEpoch {
512        assert!(!self.is_destroyed);
513        let data = self
514            .current_epoch_data
515            .as_mut()
516            .expect("should be Some when seal new epoch");
517        let current_epoch = data.epoch;
518        debug!(
519            instance_id = self.instance_id,
520            next_epoch, current_epoch, "local seal epoch"
521        );
522        assert_gt!(next_epoch, current_epoch);
523        let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch));
524        if !epoch_data.is_empty() {
525            self.sealed_data.push_front(epoch_data);
526        }
527        current_epoch
528    }
529
530    // imm_ids from old to new, which means in ascending order
531    fn ack_flushed(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
532        for imm_id in imm_ids {
533            assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id);
534        }
535    }
536
537    fn spill(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
538        let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() {
539            match oldest_sealed_epoch.epoch.cmp(&epoch) {
540                Ordering::Less => {
541                    unreachable!(
542                        "should not spill at this epoch because there \
543                    is unspilled data in previous epoch: prev epoch {}, spill epoch {}",
544                        oldest_sealed_epoch.epoch, epoch
545                    );
546                }
547                Ordering::Equal => {
548                    let epoch_data = self.sealed_data.pop_back().unwrap();
549                    assert_eq!(epoch, epoch_data.epoch);
550                    epoch_data.imms
551                }
552                Ordering::Greater => VecDeque::new(),
553            }
554        } else {
555            let Some(current_epoch_data) = &mut self.current_epoch_data else {
556                return Vec::new();
557            };
558            match current_epoch_data.epoch.cmp(&epoch) {
559                Ordering::Less => {
560                    assert!(
561                        current_epoch_data.imms.is_empty(),
562                        "should not spill at this epoch because there \
563                    is unspilled data in current epoch epoch {}, spill epoch {}",
564                        current_epoch_data.epoch,
565                        epoch
566                    );
567                    VecDeque::new()
568                }
569                Ordering::Equal => {
570                    if !current_epoch_data.imms.is_empty() {
571                        current_epoch_data.has_spilled = true;
572                        take(&mut current_epoch_data.imms)
573                    } else {
574                        VecDeque::new()
575                    }
576                }
577                Ordering::Greater => VecDeque::new(),
578            }
579        };
580        self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
581        imms.into_iter().collect()
582    }
583
584    fn add_flushing_imm(&mut self, imm_ids: impl Iterator<Item = SharedBufferBatchId>) {
585        for imm_id in imm_ids {
586            if let Some(prev_imm_id) = self.flushing_imms.front() {
587                assert_gt!(imm_id, *prev_imm_id);
588            }
589            self.flushing_imms.push_front(imm_id);
590        }
591    }
592
593    // start syncing the imm inclusively before the `epoch`
594    // returning data with newer data coming first
595    fn sync(&mut self, epoch: HummockEpoch) -> Vec<UploaderImm> {
596        // firstly added from old to new
597        let mut ret = Vec::new();
598        while let Some(epoch_data) = self.sealed_data.back()
599            && epoch_data.epoch() <= epoch
600        {
601            let imms = self.sealed_data.pop_back().expect("checked exist").imms;
602            self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id()));
603            ret.extend(imms.into_iter().rev());
604        }
605        // reverse so that newer data comes first
606        ret.reverse();
607        if let Some(latest_epoch_data) = &self.current_epoch_data
608            && latest_epoch_data.epoch <= epoch
609        {
610            assert!(self.sealed_data.is_empty());
611            assert!(latest_epoch_data.is_empty());
612            assert!(!latest_epoch_data.has_spilled);
613            if cfg!(debug_assertions) {
614                panic!(
615                    "sync epoch exceeds latest epoch, and the current instance should have been archived"
616                );
617            }
618            warn!(
619                instance_id = self.instance_id,
620                table_id = self.table_id.table_id,
621                "sync epoch exceeds latest epoch, and the current instance should have be archived"
622            );
623            self.current_epoch_data = None;
624        }
625        ret
626    }
627
628    fn assert_after_epoch(&self, epoch: HummockEpoch) {
629        if let Some(oldest_sealed_data) = self.sealed_data.back() {
630            assert!(!oldest_sealed_data.imms.is_empty());
631            assert_gt!(oldest_sealed_data.epoch, epoch);
632        } else if let Some(current_data) = &self.current_epoch_data
633            && current_data.epoch <= epoch
634        {
635            assert!(current_data.imms.is_empty() && !current_data.has_spilled);
636        }
637    }
638
639    fn is_finished(&self) -> bool {
640        self.is_destroyed && self.sealed_data.is_empty()
641    }
642}
643
644struct TableUnsyncData {
645    table_id: TableId,
646    instance_data: HashMap<LocalInstanceId, LocalInstanceUnsyncData>,
647    #[expect(clippy::type_complexity)]
648    table_watermarks: Option<(
649        WatermarkDirection,
650        BTreeMap<HummockEpoch, (Vec<VnodeWatermark>, BitmapBuilder)>,
651        WatermarkSerdeType,
652    )>,
653    spill_tasks: BTreeMap<HummockEpoch, VecDeque<UploadingTaskId>>,
654    unsync_epochs: BTreeMap<HummockEpoch, ()>,
655    // Initialized to be `None`. Transform to `Some(_)` when called
656    // `local_seal_epoch` with a non-existing epoch, to mark that
657    // the fragment of the table has stopped.
658    stopped_next_epoch: Option<HummockEpoch>,
659    // newer epoch at the front
660    syncing_epochs: VecDeque<HummockEpoch>,
661    max_synced_epoch: Option<HummockEpoch>,
662}
663
664impl TableUnsyncData {
665    fn new(table_id: TableId, committed_epoch: Option<HummockEpoch>) -> Self {
666        Self {
667            table_id,
668            instance_data: Default::default(),
669            table_watermarks: None,
670            spill_tasks: Default::default(),
671            unsync_epochs: Default::default(),
672            stopped_next_epoch: None,
673            syncing_epochs: Default::default(),
674            max_synced_epoch: committed_epoch,
675        }
676    }
677
678    fn new_epoch(&mut self, epoch: HummockEpoch) {
679        debug!(table_id = ?self.table_id, epoch, "table new epoch");
680        if let Some(latest_epoch) = self.max_epoch() {
681            assert_gt!(epoch, latest_epoch);
682        }
683        self.unsync_epochs.insert(epoch, ());
684    }
685
686    #[expect(clippy::type_complexity)]
687    fn sync(
688        &mut self,
689        epoch: HummockEpoch,
690    ) -> (
691        impl Iterator<Item = (LocalInstanceId, Vec<UploaderImm>)> + '_,
692        Option<(
693            WatermarkDirection,
694            impl Iterator<Item = (HummockEpoch, Vec<VnodeWatermark>)> + use<>,
695            WatermarkSerdeType,
696        )>,
697        impl Iterator<Item = UploadingTaskId> + use<>,
698        BTreeMap<HummockEpoch, ()>,
699    ) {
700        if let Some(prev_epoch) = self.max_sync_epoch() {
701            assert_gt!(epoch, prev_epoch)
702        }
703        let epochs = take_before_epoch(&mut self.unsync_epochs, epoch);
704        assert_eq!(
705            *epochs.last_key_value().expect("non-empty").0,
706            epoch,
707            "{epochs:?} {epoch} {:?}",
708            self.table_id
709        );
710        self.syncing_epochs.push_front(epoch);
711        (
712            self.instance_data
713                .iter_mut()
714                .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))),
715            self.table_watermarks
716                .as_mut()
717                .map(|(direction, watermarks, watermark_type)| {
718                    let watermarks = take_before_epoch(watermarks, epoch)
719                        .into_iter()
720                        .map(|(epoch, (watermarks, _))| (epoch, watermarks));
721                    (*direction, watermarks, *watermark_type)
722                }),
723            take_before_epoch(&mut self.spill_tasks, epoch)
724                .into_values()
725                .flat_map(|tasks| tasks.into_iter()),
726            epochs,
727        )
728    }
729
730    fn ack_synced(&mut self, sync_epoch: HummockEpoch) {
731        let min_sync_epoch = self.syncing_epochs.pop_back().expect("should exist");
732        assert_eq!(sync_epoch, min_sync_epoch);
733        self.max_synced_epoch = Some(sync_epoch);
734    }
735
736    fn ack_committed(&mut self, committed_epoch: HummockEpoch) {
737        let synced_epoch_advanced = {
738            if let Some(max_synced_epoch) = self.max_synced_epoch
739                && max_synced_epoch >= committed_epoch
740            {
741                false
742            } else {
743                true
744            }
745        };
746        if synced_epoch_advanced {
747            self.max_synced_epoch = Some(committed_epoch);
748            if let Some(min_syncing_epoch) = self.syncing_epochs.back() {
749                assert_gt!(*min_syncing_epoch, committed_epoch);
750            }
751            self.assert_after_epoch(committed_epoch);
752        }
753    }
754
755    fn assert_after_epoch(&self, epoch: HummockEpoch) {
756        self.instance_data
757            .values()
758            .for_each(|instance_data| instance_data.assert_after_epoch(epoch));
759        if let Some((_, watermarks, _)) = &self.table_watermarks
760            && let Some((oldest_epoch, _)) = watermarks.first_key_value()
761        {
762            assert_gt!(*oldest_epoch, epoch);
763        }
764    }
765
766    fn max_sync_epoch(&self) -> Option<HummockEpoch> {
767        self.syncing_epochs
768            .front()
769            .cloned()
770            .or(self.max_synced_epoch)
771    }
772
773    fn max_epoch(&self) -> Option<HummockEpoch> {
774        self.unsync_epochs
775            .last_key_value()
776            .map(|(epoch, _)| *epoch)
777            .or_else(|| self.max_sync_epoch())
778    }
779
780    fn is_empty(&self) -> bool {
781        self.instance_data.is_empty()
782            && self.syncing_epochs.is_empty()
783            && self.unsync_epochs.is_empty()
784    }
785}
786
787#[derive(Eq, Hash, PartialEq, Copy, Clone)]
788struct UnsyncEpochId(HummockEpoch, TableId);
789
790impl UnsyncEpochId {
791    fn epoch(&self) -> HummockEpoch {
792        self.0
793    }
794}
795
796fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet<TableId>) -> Option<UnsyncEpochId> {
797    table_ids
798        .iter()
799        .min()
800        .map(|table_id| UnsyncEpochId(epoch, *table_id))
801}
802
803#[derive(Default)]
804/// Unsync data, can be either imm or spilled sst, and some aggregated epoch information.
805///
806/// `instance_data` holds the imm of each individual local instance, and data are first added here.
807/// The aggregated epoch information (table watermarks, etc.) and the spilled sst will be added to `epoch_data`.
808struct UnsyncData {
809    table_data: HashMap<TableId, TableUnsyncData>,
810    // An index as a mapping from instance id to its table id
811    instance_table_id: HashMap<LocalInstanceId, TableId>,
812    unsync_epochs: HashMap<UnsyncEpochId, HashSet<TableId>>,
813    spilled_data: HashMap<UploadingTaskId, (Arc<StagingSstableInfo>, HashSet<TableId>)>,
814}
815
816impl UnsyncData {
817    fn init_instance(
818        &mut self,
819        table_id: TableId,
820        instance_id: LocalInstanceId,
821        init_epoch: HummockEpoch,
822    ) {
823        debug!(
824            table_id = table_id.table_id,
825            instance_id, init_epoch, "init epoch"
826        );
827        let table_data = self
828            .table_data
829            .get_mut(&table_id)
830            .unwrap_or_else(|| panic!("should exist. {table_id:?}"));
831        assert!(
832            table_data
833                .instance_data
834                .insert(
835                    instance_id,
836                    LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch)
837                )
838                .is_none()
839        );
840        assert!(
841            self.instance_table_id
842                .insert(instance_id, table_id)
843                .is_none()
844        );
845        assert!(table_data.unsync_epochs.contains_key(&init_epoch));
846    }
847
848    fn instance_data(
849        &mut self,
850        instance_id: LocalInstanceId,
851    ) -> Option<&mut LocalInstanceUnsyncData> {
852        self.instance_table_id
853            .get_mut(&instance_id)
854            .cloned()
855            .map(move |table_id| {
856                self.table_data
857                    .get_mut(&table_id)
858                    .expect("should exist")
859                    .instance_data
860                    .get_mut(&instance_id)
861                    .expect("should exist")
862            })
863    }
864
865    fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) {
866        self.instance_data(instance_id)
867            .expect("should exist")
868            .add_imm(imm);
869    }
870
871    fn local_seal_epoch(
872        &mut self,
873        instance_id: LocalInstanceId,
874        next_epoch: HummockEpoch,
875        opts: SealCurrentEpochOptions,
876    ) {
877        let table_id = self.instance_table_id[&instance_id];
878        let table_data = self.table_data.get_mut(&table_id).expect("should exist");
879        let instance_data = table_data
880            .instance_data
881            .get_mut(&instance_id)
882            .expect("should exist");
883        let epoch = instance_data.local_seal_epoch(next_epoch);
884        // When drop/cancel a streaming job, for the barrier to stop actor, the
885        // local instance will call `local_seal_epoch`, but the `next_epoch` won't be
886        // called `start_epoch` because we have stopped writing on it.
887        if !table_data.unsync_epochs.contains_key(&next_epoch) {
888            if let Some(stopped_next_epoch) = table_data.stopped_next_epoch {
889                if stopped_next_epoch != next_epoch {
890                    let table_id = table_data.table_id.table_id;
891                    let unsync_epochs = table_data.unsync_epochs.keys().collect_vec();
892                    if cfg!(debug_assertions) {
893                        panic!(
894                            "table_id {} stop epoch {} different to prev stop epoch {}. unsync epochs: {:?}, syncing epochs {:?}, max_synced_epoch {:?}",
895                            table_id,
896                            next_epoch,
897                            stopped_next_epoch,
898                            unsync_epochs,
899                            table_data.syncing_epochs,
900                            table_data.max_synced_epoch
901                        );
902                    } else {
903                        warn!(
904                            table_id,
905                            stopped_next_epoch,
906                            next_epoch,
907                            ?unsync_epochs,
908                            syncing_epochs = ?table_data.syncing_epochs,
909                            max_synced_epoch = ?table_data.max_synced_epoch,
910                            "different stop epoch"
911                        );
912                    }
913                }
914            } else {
915                if let Some(max_epoch) = table_data.max_epoch() {
916                    assert_gt!(next_epoch, max_epoch);
917                }
918                debug!(?table_id, epoch, next_epoch, "table data has stopped");
919                table_data.stopped_next_epoch = Some(next_epoch);
920            }
921        }
922        if let Some((direction, table_watermarks, watermark_type)) = opts.table_watermarks {
923            table_data.add_table_watermarks(epoch, table_watermarks, direction, watermark_type);
924        }
925    }
926
927    fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
928        if let Some(table_id) = self.instance_table_id.get(&instance_id) {
929            debug!(instance_id, "destroy instance");
930            let table_data = self.table_data.get_mut(table_id).expect("should exist");
931            let instance_data = table_data
932                .instance_data
933                .get_mut(&instance_id)
934                .expect("should exist");
935            assert!(
936                !instance_data.is_destroyed,
937                "cannot destroy an instance for twice"
938            );
939            instance_data.is_destroyed = true;
940        }
941    }
942
943    fn clear_tables(&mut self, table_ids: &HashSet<TableId>, task_manager: &mut TaskManager) {
944        for table_id in table_ids {
945            if let Some(table_unsync_data) = self.table_data.remove(table_id) {
946                for task_id in table_unsync_data.spill_tasks.into_values().flatten() {
947                    if let Some(task_status) = task_manager.abort_task(task_id) {
948                        must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => {
949                            assert!(spill_table_ids.is_subset(table_ids));
950                        });
951                    }
952                    if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) {
953                        assert!(spill_table_ids.is_subset(table_ids));
954                    }
955                }
956                assert!(
957                    table_unsync_data
958                        .instance_data
959                        .values()
960                        .all(|instance| instance.is_destroyed),
961                    "should be clear when dropping the read version instance"
962                );
963                for instance_id in table_unsync_data.instance_data.keys() {
964                    assert_eq!(
965                        *table_id,
966                        self.instance_table_id
967                            .remove(instance_id)
968                            .expect("should exist")
969                    );
970                }
971            }
972        }
973        debug_assert!(
974            self.spilled_data
975                .values()
976                .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))
977        );
978        self.unsync_epochs.retain(|_, unsync_epoch_table_ids| {
979            if !unsync_epoch_table_ids.is_disjoint(table_ids) {
980                assert!(unsync_epoch_table_ids.is_subset(table_ids));
981                false
982            } else {
983                true
984            }
985        });
986        assert!(
987            self.instance_table_id
988                .values()
989                .all(|table_id| !table_ids.contains(table_id))
990        );
991    }
992}
993
994impl UploaderData {
995    fn sync(
996        &mut self,
997        context: &UploaderContext,
998        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
999        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1000    ) {
1001        let mut all_table_watermarks = HashMap::new();
1002        let mut uploading_tasks = HashSet::new();
1003        let mut spilled_tasks = BTreeSet::new();
1004        let mut all_table_ids = HashSet::new();
1005        let mut vector_index_adds = HashMap::new();
1006
1007        let mut flush_payload = HashMap::new();
1008
1009        for (epoch, table_ids) in &sync_table_epochs {
1010            let epoch = *epoch;
1011            for table_id in table_ids {
1012                assert!(
1013                    all_table_ids.insert(*table_id),
1014                    "duplicate sync table epoch: {:?} {:?}",
1015                    all_table_ids,
1016                    sync_table_epochs
1017                );
1018            }
1019            if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, table_ids) {
1020                let min_table_id_data = self
1021                    .unsync_data
1022                    .table_data
1023                    .get_mut(&min_table_id)
1024                    .expect("should exist");
1025                let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch);
1026                for epoch in epochs.keys() {
1027                    assert_eq!(
1028                        &self
1029                            .unsync_data
1030                            .unsync_epochs
1031                            .remove(&UnsyncEpochId(*epoch, min_table_id))
1032                            .expect("should exist"),
1033                        table_ids
1034                    );
1035                }
1036                for table_id in table_ids {
1037                    let table_data = self
1038                        .unsync_data
1039                        .table_data
1040                        .get_mut(table_id)
1041                        .expect("should exist");
1042                    let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) =
1043                        table_data.sync(epoch);
1044                    assert_eq!(table_unsync_epochs, epochs);
1045                    for (instance_id, payload) in unflushed_payload {
1046                        if !payload.is_empty() {
1047                            flush_payload.insert(instance_id, payload);
1048                        }
1049                    }
1050                    table_data.instance_data.retain(|instance_id, data| {
1051                        // remove the finished instances
1052                        if data.is_finished() {
1053                            assert_eq!(
1054                                self.unsync_data.instance_table_id.remove(instance_id),
1055                                Some(*table_id)
1056                            );
1057                            false
1058                        } else {
1059                            true
1060                        }
1061                    });
1062                    if let Some((direction, watermarks, watermark_type)) = table_watermarks {
1063                        Self::add_table_watermarks(
1064                            &mut all_table_watermarks,
1065                            *table_id,
1066                            direction,
1067                            watermarks,
1068                            watermark_type,
1069                        );
1070                    }
1071                    for task_id in task_ids {
1072                        if self.unsync_data.spilled_data.contains_key(&task_id) {
1073                            spilled_tasks.insert(task_id);
1074                        } else {
1075                            uploading_tasks.insert(task_id);
1076                        }
1077                    }
1078
1079                    if let hash_map::Entry::Occupied(mut entry) =
1080                        self.unsync_vector_index_data.entry(*table_id)
1081                    {
1082                        let data = entry.get_mut();
1083                        let adds = take_before_epoch(&mut data.sealed_epoch_data, epoch)
1084                            .into_values()
1085                            .flatten()
1086                            .collect_vec();
1087                        if data.is_dropped && data.sealed_epoch_data.is_empty() {
1088                            entry.remove();
1089                        }
1090                        if !adds.is_empty() {
1091                            vector_index_adds
1092                                .try_insert(*table_id, adds)
1093                                .expect("non-duplicate");
1094                        }
1095                    }
1096                }
1097            }
1098        }
1099
1100        let sync_id = {
1101            let sync_id = self.next_sync_id;
1102            self.next_sync_id += 1;
1103            SyncId(sync_id)
1104        };
1105
1106        if let Some(extra_flush_task_id) = self.task_manager.sync(
1107            context,
1108            sync_id,
1109            flush_payload,
1110            uploading_tasks.iter().cloned(),
1111            &all_table_ids,
1112        ) {
1113            uploading_tasks.insert(extra_flush_task_id);
1114        }
1115
1116        // iter from large task_id to small one so that newer data at the front
1117        let uploaded = spilled_tasks
1118            .iter()
1119            .rev()
1120            .map(|task_id| {
1121                let (sst, spill_table_ids) = self
1122                    .unsync_data
1123                    .spilled_data
1124                    .remove(task_id)
1125                    .expect("should exist");
1126                assert!(
1127                    spill_table_ids.is_subset(&all_table_ids),
1128                    "spilled tabled ids {:?} not a subset of sync table id {:?}",
1129                    spill_table_ids,
1130                    all_table_ids
1131                );
1132                sst
1133            })
1134            .collect();
1135
1136        self.syncing_data.insert(
1137            sync_id,
1138            SyncingData {
1139                sync_table_epochs,
1140                remaining_uploading_tasks: uploading_tasks,
1141                uploaded,
1142                table_watermarks: all_table_watermarks,
1143                vector_index_adds,
1144                sync_result_sender,
1145            },
1146        );
1147
1148        self.check_upload_task_consistency();
1149    }
1150}
1151
1152impl UnsyncData {
1153    fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) {
1154        for (instance_id, imm_ids) in sstable_info.imm_ids() {
1155            if let Some(instance_data) = self.instance_data(*instance_id) {
1156                // take `rev` to let old imm id goes first
1157                instance_data.ack_flushed(imm_ids.iter().rev().cloned());
1158            }
1159        }
1160    }
1161}
1162
1163struct SyncingData {
1164    sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1165    remaining_uploading_tasks: HashSet<UploadingTaskId>,
1166    // newer data at the front
1167    uploaded: VecDeque<Arc<StagingSstableInfo>>,
1168    table_watermarks: HashMap<TableId, TableWatermarks>,
1169    vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1170    sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1171}
1172
1173#[derive(Debug)]
1174pub struct SyncedData {
1175    pub uploaded_ssts: VecDeque<Arc<StagingSstableInfo>>,
1176    pub table_watermarks: HashMap<TableId, TableWatermarks>,
1177    pub vector_index_adds: HashMap<TableId, Vec<VectorIndexAdd>>,
1178}
1179
1180struct UploaderContext {
1181    pinned_version: PinnedVersion,
1182    /// When called, it will spawn a task to flush the imm into sst and return the join handle.
1183    spawn_upload_task: SpawnUploadTask,
1184    buffer_tracker: BufferTracker,
1185
1186    stats: Arc<HummockStateStoreMetrics>,
1187}
1188
1189impl UploaderContext {
1190    fn new(
1191        pinned_version: PinnedVersion,
1192        spawn_upload_task: SpawnUploadTask,
1193        buffer_tracker: BufferTracker,
1194        stats: Arc<HummockStateStoreMetrics>,
1195    ) -> Self {
1196        UploaderContext {
1197            pinned_version,
1198            spawn_upload_task,
1199            buffer_tracker,
1200            stats,
1201        }
1202    }
1203}
1204
1205#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Clone, Debug)]
1206struct SyncId(usize);
1207
1208struct UnsyncVectorIndexData {
1209    sealed_epoch_data: BTreeMap<HummockEpoch, Option<VectorIndexAdd>>,
1210    curr_epoch: u64,
1211    is_dropped: bool,
1212}
1213
1214#[derive(Default)]
1215struct UploaderData {
1216    unsync_data: UnsyncData,
1217    unsync_vector_index_data: HashMap<TableId, UnsyncVectorIndexData>,
1218
1219    syncing_data: BTreeMap<SyncId, SyncingData>,
1220
1221    task_manager: TaskManager,
1222    next_sync_id: usize,
1223}
1224
1225impl UploaderData {
1226    fn abort(self, err: impl Fn() -> HummockError) {
1227        self.task_manager.abort_all_tasks();
1228        for syncing_data in self.syncing_data.into_values() {
1229            send_sync_result(syncing_data.sync_result_sender, Err(err()));
1230        }
1231    }
1232
1233    fn clear_tables(&mut self, table_ids: HashSet<TableId>) {
1234        if table_ids.is_empty() {
1235            return;
1236        }
1237        self.unsync_data
1238            .clear_tables(&table_ids, &mut self.task_manager);
1239        self.syncing_data.retain(|sync_id, syncing_data| {
1240            if syncing_data
1241                .sync_table_epochs
1242                .iter()
1243                .any(|(_, sync_table_ids)| !sync_table_ids.is_disjoint(&table_ids))
1244            {
1245                assert!(
1246                    syncing_data
1247                        .sync_table_epochs
1248                        .iter()
1249                        .all(|(_, sync_table_ids)| sync_table_ids.is_subset(&table_ids))
1250                );
1251                for task_id in &syncing_data.remaining_uploading_tasks {
1252                    match self
1253                        .task_manager
1254                        .abort_task(*task_id)
1255                        .expect("should exist")
1256                    {
1257                        UploadingTaskStatus::Spilling(spill_table_ids) => {
1258                            assert!(spill_table_ids.is_subset(&table_ids));
1259                        }
1260                        UploadingTaskStatus::Sync(task_sync_id) => {
1261                            assert_eq!(sync_id, &task_sync_id);
1262                        }
1263                    }
1264                }
1265                false
1266            } else {
1267                true
1268            }
1269        });
1270
1271        self.check_upload_task_consistency();
1272    }
1273
1274    fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1275        self.unsync_data
1276            .spilled_data
1277            .values()
1278            .map(|(s, _)| s)
1279            .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter()))
1280            .filter_map(|s| {
1281                s.sstable_infos()
1282                    .iter()
1283                    .chain(s.old_value_sstable_infos())
1284                    .map(|s| s.sst_info.object_id)
1285                    .min()
1286            })
1287            .min()
1288            .map(|object_id| object_id.as_raw())
1289    }
1290}
1291
1292struct ErrState {
1293    failed_sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1294    reason: String,
1295}
1296
1297enum UploaderState {
1298    Working(UploaderData),
1299    Err(ErrState),
1300}
1301
1302/// An uploader for hummock data.
1303///
1304/// Data have 3 sequential stages: unsync (inside each local instance, data can be unsealed, sealed), syncing, synced.
1305///
1306/// The 3 stages are divided by 2 marginal epochs: `max_syncing_epoch`,
1307/// `max_synced_epoch` in each `TableUnSyncData`. Epochs satisfy the following inequality.
1308///
1309/// (epochs of `synced_data`) <= `max_synced_epoch` < (epochs of `syncing_data`) <=
1310/// `max_syncing_epoch` < (epochs of `unsync_data`)
1311///
1312/// Data are mostly stored in `VecDeque`, and the order stored in the `VecDeque` indicates the data
1313/// order. Data at the front represents ***newer*** data.
1314pub struct HummockUploader {
1315    state: UploaderState,
1316
1317    context: UploaderContext,
1318}
1319
1320impl HummockUploader {
1321    pub(super) fn new(
1322        state_store_metrics: Arc<HummockStateStoreMetrics>,
1323        pinned_version: PinnedVersion,
1324        spawn_upload_task: SpawnUploadTask,
1325        buffer_tracker: BufferTracker,
1326    ) -> Self {
1327        Self {
1328            state: UploaderState::Working(UploaderData::default()),
1329            context: UploaderContext::new(
1330                pinned_version,
1331                spawn_upload_task,
1332                buffer_tracker,
1333                state_store_metrics,
1334            ),
1335        }
1336    }
1337
1338    pub(super) fn buffer_tracker(&self) -> &BufferTracker {
1339        &self.context.buffer_tracker
1340    }
1341
1342    pub(super) fn hummock_version(&self) -> &PinnedVersion {
1343        &self.context.pinned_version
1344    }
1345
1346    pub(super) fn add_imms(&mut self, instance_id: LocalInstanceId, imms: Vec<ImmutableMemtable>) {
1347        let UploaderState::Working(data) = &mut self.state else {
1348            return;
1349        };
1350        for imm in imms {
1351            let imm = UploaderImm::new(imm, &self.context);
1352            data.unsync_data.add_imm(instance_id, imm);
1353        }
1354    }
1355
1356    pub(super) fn init_instance(
1357        &mut self,
1358        instance_id: LocalInstanceId,
1359        table_id: TableId,
1360        init_epoch: HummockEpoch,
1361    ) {
1362        let UploaderState::Working(data) = &mut self.state else {
1363            return;
1364        };
1365        data.unsync_data
1366            .init_instance(table_id, instance_id, init_epoch);
1367    }
1368
1369    pub(super) fn local_seal_epoch(
1370        &mut self,
1371        instance_id: LocalInstanceId,
1372        next_epoch: HummockEpoch,
1373        opts: SealCurrentEpochOptions,
1374    ) {
1375        let UploaderState::Working(data) = &mut self.state else {
1376            return;
1377        };
1378        data.unsync_data
1379            .local_seal_epoch(instance_id, next_epoch, opts);
1380    }
1381
1382    pub(super) fn register_vector_writer(&mut self, table_id: TableId, init_epoch: HummockEpoch) {
1383        let UploaderState::Working(data) = &mut self.state else {
1384            return;
1385        };
1386        assert!(
1387            data.unsync_vector_index_data
1388                .try_insert(
1389                    table_id,
1390                    UnsyncVectorIndexData {
1391                        sealed_epoch_data: Default::default(),
1392                        curr_epoch: init_epoch,
1393                        is_dropped: false,
1394                    }
1395                )
1396                .is_ok(),
1397            "duplicate vector writer on {}",
1398            table_id
1399        )
1400    }
1401
1402    pub(super) fn vector_writer_seal_epoch(
1403        &mut self,
1404        table_id: TableId,
1405        next_epoch: HummockEpoch,
1406        add: Option<VectorIndexAdd>,
1407    ) {
1408        let UploaderState::Working(data) = &mut self.state else {
1409            return;
1410        };
1411        let data = data
1412            .unsync_vector_index_data
1413            .get_mut(&table_id)
1414            .expect("should exist");
1415        assert!(!data.is_dropped);
1416        assert!(
1417            data.curr_epoch < next_epoch,
1418            "next epoch {} should be greater than current epoch {}",
1419            next_epoch,
1420            data.curr_epoch
1421        );
1422        data.sealed_epoch_data
1423            .try_insert(data.curr_epoch, add)
1424            .expect("non-duplicate");
1425        data.curr_epoch = next_epoch;
1426    }
1427
1428    pub(super) fn drop_vector_writer(&mut self, table_id: TableId) {
1429        let UploaderState::Working(data) = &mut self.state else {
1430            return;
1431        };
1432        let hash_map::Entry::Occupied(mut entry) = data.unsync_vector_index_data.entry(table_id)
1433        else {
1434            panic!("vector writer {} should exist", table_id);
1435        };
1436        let data = entry.get_mut();
1437        data.is_dropped = true;
1438        if data.sealed_epoch_data.is_empty() {
1439            entry.remove();
1440        }
1441    }
1442
1443    pub(super) fn start_epoch(&mut self, epoch: HummockEpoch, table_ids: HashSet<TableId>) {
1444        let UploaderState::Working(data) = &mut self.state else {
1445            return;
1446        };
1447        debug!(epoch, ?table_ids, "start epoch");
1448        for table_id in &table_ids {
1449            let table_data = data
1450                .unsync_data
1451                .table_data
1452                .entry(*table_id)
1453                .or_insert_with(|| {
1454                    TableUnsyncData::new(
1455                        *table_id,
1456                        self.context.pinned_version.table_committed_epoch(*table_id),
1457                    )
1458                });
1459            table_data.new_epoch(epoch);
1460        }
1461        if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) {
1462            assert!(
1463                data.unsync_data
1464                    .unsync_epochs
1465                    .insert(unsync_epoch_id, table_ids)
1466                    .is_none()
1467            );
1468        }
1469    }
1470
1471    pub(super) fn start_sync_epoch(
1472        &mut self,
1473        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1474        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
1475    ) {
1476        let data = match &mut self.state {
1477            UploaderState::Working(data) => data,
1478            UploaderState::Err(ErrState {
1479                failed_sync_table_epochs,
1480                reason,
1481            }) => {
1482                let result = Err(HummockError::other(format!(
1483                    "previous sync epoch {:?} failed due to [{}]",
1484                    failed_sync_table_epochs, reason
1485                )));
1486                send_sync_result(sync_result_sender, result);
1487                return;
1488            }
1489        };
1490        debug!(?sync_table_epochs, "start sync epoch");
1491
1492        data.sync(&self.context, sync_result_sender, sync_table_epochs);
1493
1494        data.may_notify_sync_task(&self.context);
1495
1496        self.context
1497            .stats
1498            .uploader_syncing_epoch_count
1499            .set(data.syncing_data.len() as _);
1500    }
1501
1502    pub(crate) fn update_pinned_version(&mut self, pinned_version: PinnedVersion) {
1503        if let UploaderState::Working(data) = &mut self.state {
1504            // TODO: may only `ack_committed` on table whose `committed_epoch` is changed.
1505            for (table_id, info) in pinned_version.state_table_info.info() {
1506                if let Some(table_data) = data.unsync_data.table_data.get_mut(table_id) {
1507                    table_data.ack_committed(info.committed_epoch);
1508                }
1509            }
1510        }
1511        self.context.pinned_version = pinned_version;
1512    }
1513
1514    pub(crate) fn may_flush(&mut self) -> bool {
1515        let UploaderState::Working(data) = &mut self.state else {
1516            return false;
1517        };
1518        if self.context.buffer_tracker.need_flush() {
1519            let mut spiller = Spiller::new(&mut data.unsync_data);
1520            let mut curr_batch_flush_size = 0;
1521            // iterate from older epoch to newer epoch
1522            while self
1523                .context
1524                .buffer_tracker
1525                .need_more_flush(curr_batch_flush_size)
1526                && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload()
1527            {
1528                assert!(!payload.is_empty());
1529                {
1530                    let (task_id, task_size, spilled_table_ids) =
1531                        data.task_manager
1532                            .spill(&self.context, spilled_table_ids, payload);
1533                    for table_id in spilled_table_ids {
1534                        spiller
1535                            .unsync_data()
1536                            .table_data
1537                            .get_mut(table_id)
1538                            .expect("should exist")
1539                            .spill_tasks
1540                            .entry(epoch)
1541                            .or_default()
1542                            .push_front(task_id);
1543                    }
1544                    curr_batch_flush_size += task_size;
1545                }
1546            }
1547            data.check_upload_task_consistency();
1548            curr_batch_flush_size > 0
1549        } else {
1550            false
1551        }
1552    }
1553
1554    pub(crate) fn clear(&mut self, table_ids: Option<HashSet<TableId>>) {
1555        if let Some(table_ids) = table_ids {
1556            if let UploaderState::Working(data) = &mut self.state {
1557                data.clear_tables(table_ids);
1558            }
1559        } else {
1560            if let UploaderState::Working(data) = replace(
1561                &mut self.state,
1562                UploaderState::Working(UploaderData::default()),
1563            ) {
1564                data.abort(|| HummockError::other("uploader is reset"));
1565            }
1566
1567            self.context.stats.uploader_syncing_epoch_count.set(0);
1568        }
1569    }
1570
1571    pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) {
1572        let UploaderState::Working(data) = &mut self.state else {
1573            return;
1574        };
1575        data.unsync_data.may_destroy_instance(instance_id);
1576    }
1577
1578    pub(crate) fn min_uncommitted_object_id(&self) -> Option<HummockRawObjectId> {
1579        if let UploaderState::Working(ref u) = self.state {
1580            u.min_uncommitted_object_id()
1581        } else {
1582            None
1583        }
1584    }
1585}
1586
1587impl UploaderData {
1588    fn may_notify_sync_task(&mut self, context: &UploaderContext) {
1589        while let Some((_, syncing_data)) = self.syncing_data.first_key_value()
1590            && syncing_data.remaining_uploading_tasks.is_empty()
1591        {
1592            let (_, syncing_data) = self.syncing_data.pop_first().expect("non-empty");
1593            let SyncingData {
1594                sync_table_epochs,
1595                remaining_uploading_tasks: _,
1596                uploaded,
1597                table_watermarks,
1598                vector_index_adds,
1599                sync_result_sender,
1600            } = syncing_data;
1601            context
1602                .stats
1603                .uploader_syncing_epoch_count
1604                .set(self.syncing_data.len() as _);
1605
1606            for (sync_epoch, table_ids) in sync_table_epochs {
1607                for table_id in table_ids {
1608                    if let Some(table_data) = self.unsync_data.table_data.get_mut(&table_id) {
1609                        table_data.ack_synced(sync_epoch);
1610                        if table_data.is_empty() {
1611                            self.unsync_data.table_data.remove(&table_id);
1612                        }
1613                    }
1614                }
1615            }
1616
1617            send_sync_result(
1618                sync_result_sender,
1619                Ok(SyncedData {
1620                    uploaded_ssts: uploaded,
1621                    table_watermarks,
1622                    vector_index_adds,
1623                }),
1624            )
1625        }
1626    }
1627
1628    fn check_upload_task_consistency(&self) {
1629        #[cfg(debug_assertions)]
1630        {
1631            let mut spill_task_table_id_from_data: HashMap<_, HashSet<_>> = HashMap::new();
1632            for table_data in self.unsync_data.table_data.values() {
1633                for task_id in table_data
1634                    .spill_tasks
1635                    .iter()
1636                    .flat_map(|(_, tasks)| tasks.iter())
1637                {
1638                    assert!(
1639                        spill_task_table_id_from_data
1640                            .entry(*task_id)
1641                            .or_default()
1642                            .insert(table_data.table_id)
1643                    );
1644                }
1645            }
1646            let syncing_task_id_from_data: HashMap<_, HashSet<_>> = self
1647                .syncing_data
1648                .iter()
1649                .filter_map(|(sync_id, data)| {
1650                    if data.remaining_uploading_tasks.is_empty() {
1651                        None
1652                    } else {
1653                        Some((*sync_id, data.remaining_uploading_tasks.clone()))
1654                    }
1655                })
1656                .collect();
1657
1658            let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1659            for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data {
1660                spill_task_table_id_from_manager.insert(*task_id, table_ids.clone());
1661            }
1662            let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new();
1663            for (task_id, status) in self.task_manager.tasks() {
1664                match status {
1665                    UploadingTaskStatus::Spilling(table_ids) => {
1666                        assert!(
1667                            spill_task_table_id_from_manager
1668                                .insert(task_id, table_ids.clone())
1669                                .is_none()
1670                        );
1671                    }
1672                    UploadingTaskStatus::Sync(sync_id) => {
1673                        assert!(
1674                            syncing_task_from_manager
1675                                .entry(*sync_id)
1676                                .or_default()
1677                                .insert(task_id)
1678                        );
1679                    }
1680                }
1681            }
1682            assert_eq!(
1683                spill_task_table_id_from_data,
1684                spill_task_table_id_from_manager
1685            );
1686            assert_eq!(syncing_task_id_from_data, syncing_task_from_manager);
1687        }
1688    }
1689}
1690
1691impl HummockUploader {
1692    pub(super) fn next_uploaded_sst(
1693        &mut self,
1694    ) -> impl Future<Output = Arc<StagingSstableInfo>> + '_ {
1695        poll_fn(|cx| {
1696            let UploaderState::Working(data) = &mut self.state else {
1697                return Poll::Pending;
1698            };
1699
1700            if let Some((task_id, status, result)) = ready!(data.task_manager.poll_task_result(cx))
1701            {
1702                match result {
1703                    Ok(sst) => {
1704                        data.unsync_data.ack_flushed(&sst);
1705                        match status {
1706                            UploadingTaskStatus::Sync(sync_id) => {
1707                                let syncing_data =
1708                                    data.syncing_data.get_mut(&sync_id).expect("should exist");
1709                                syncing_data.uploaded.push_front(sst.clone());
1710                                assert!(syncing_data.remaining_uploading_tasks.remove(&task_id));
1711                                data.may_notify_sync_task(&self.context);
1712                            }
1713                            UploadingTaskStatus::Spilling(table_ids) => {
1714                                data.unsync_data
1715                                    .spilled_data
1716                                    .insert(task_id, (sst.clone(), table_ids));
1717                            }
1718                        }
1719                        data.check_upload_task_consistency();
1720                        Poll::Ready(sst)
1721                    }
1722                    Err((sync_id, e)) => {
1723                        let syncing_data =
1724                            data.syncing_data.remove(&sync_id).expect("should exist");
1725                        let failed_epochs = syncing_data.sync_table_epochs.clone();
1726                        let data = must_match!(replace(
1727                            &mut self.state,
1728                            UploaderState::Err(ErrState {
1729                                failed_sync_table_epochs: syncing_data.sync_table_epochs,
1730                                reason: e.as_report().to_string(),
1731                            }),
1732                        ), UploaderState::Working(data) => data);
1733
1734                        let _ = syncing_data
1735                            .sync_result_sender
1736                            .send(Err(HummockError::other(format!(
1737                                "failed to sync: {:?}",
1738                                e.as_report()
1739                            ))));
1740
1741                        data.abort(|| {
1742                            HummockError::other(format!(
1743                                "previous epoch {:?} failed to sync",
1744                                failed_epochs
1745                            ))
1746                        });
1747                        Poll::Pending
1748                    }
1749                }
1750            } else {
1751                Poll::Pending
1752            }
1753        })
1754    }
1755}
1756
1757#[cfg(test)]
1758pub(crate) mod tests {
1759    use std::collections::{HashMap, HashSet};
1760    use std::future::{Future, poll_fn};
1761    use std::ops::Deref;
1762    use std::pin::pin;
1763    use std::sync::Arc;
1764    use std::sync::atomic::AtomicUsize;
1765    use std::sync::atomic::Ordering::SeqCst;
1766    use std::task::Poll;
1767
1768    use futures::FutureExt;
1769    use risingwave_common::catalog::TableId;
1770    use risingwave_common::util::epoch::EpochExt;
1771    use risingwave_hummock_sdk::HummockEpoch;
1772    use risingwave_hummock_sdk::vector_index::{
1773        FlatIndexAdd, VectorFileInfo, VectorIndexAdd, VectorStoreInfoDelta,
1774    };
1775    use tokio::sync::oneshot;
1776
1777    use super::test_utils::*;
1778    use crate::hummock::event_handler::uploader::{
1779        HummockUploader, SyncedData, UploadingTask, get_payload_imm_ids,
1780    };
1781    use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID};
1782    use crate::hummock::{HummockError, HummockResult};
1783    use crate::mem_table::ImmutableMemtable;
1784    use crate::opts::StorageOpts;
1785
1786    impl HummockUploader {
1787        pub(super) fn add_imm(&mut self, instance_id: LocalInstanceId, imm: ImmutableMemtable) {
1788            self.add_imms(instance_id, vec![imm]);
1789        }
1790
1791        pub(super) fn start_single_epoch_sync(
1792            &mut self,
1793            epoch: HummockEpoch,
1794            sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
1795            table_ids: HashSet<TableId>,
1796        ) {
1797            self.start_sync_epoch(sync_result_sender, vec![(epoch, table_ids)]);
1798        }
1799    }
1800
1801    #[tokio::test]
1802    pub async fn test_uploading_task_future() {
1803        let uploader_context = test_uploader_context(dummy_success_upload_future);
1804
1805        let imm = gen_imm(INITIAL_EPOCH).await;
1806        let imm_size = imm.size();
1807        let imm_ids = get_imm_ids(vec![&imm]);
1808        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1809        assert_eq!(imm_size, task.task_info.task_size);
1810        assert_eq!(imm_ids, task.task_info.imm_ids);
1811        assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs);
1812        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1813        assert_eq!(
1814            output.sstable_infos(),
1815            &dummy_success_upload_output().new_value_ssts
1816        );
1817        assert_eq!(imm_size, output.imm_size());
1818        assert_eq!(&imm_ids, output.imm_ids());
1819        assert_eq!(&vec![INITIAL_EPOCH], output.epochs());
1820
1821        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1822        let imm = gen_imm(INITIAL_EPOCH).await;
1823        let mut task = UploadingTask::from_vec(vec![imm], &uploader_context);
1824        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1825    }
1826
1827    #[tokio::test]
1828    pub async fn test_uploading_task_poll_result() {
1829        let uploader_context = test_uploader_context(dummy_success_upload_future);
1830        let mut task =
1831            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1832        let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap();
1833        assert_eq!(
1834            output.sstable_infos(),
1835            &dummy_success_upload_output().new_value_ssts
1836        );
1837
1838        let uploader_context = test_uploader_context(dummy_fail_upload_future);
1839        let mut task =
1840            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1841        let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err();
1842    }
1843
1844    #[tokio::test]
1845    async fn test_uploading_task_poll_ok_with_retry() {
1846        let run_count = Arc::new(AtomicUsize::new(0));
1847        let fail_num = 10;
1848        let run_count_clone = run_count.clone();
1849        let uploader_context = test_uploader_context(move |payload, info| {
1850            let run_count = run_count.clone();
1851            async move {
1852                // fail in the first `fail_num` run, and success at the end
1853                let ret = if run_count.load(SeqCst) < fail_num {
1854                    Err(HummockError::other("fail"))
1855                } else {
1856                    dummy_success_upload_future(payload, info).await
1857                };
1858                run_count.fetch_add(1, SeqCst);
1859                ret
1860            }
1861        });
1862        let mut task =
1863            UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context);
1864        let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await;
1865        assert_eq!(fail_num + 1, run_count_clone.load(SeqCst));
1866        assert_eq!(
1867            output.sstable_infos(),
1868            &dummy_success_upload_output().new_value_ssts
1869        );
1870    }
1871
1872    #[tokio::test]
1873    async fn test_uploader_basic() {
1874        let mut uploader = test_uploader(dummy_success_upload_future);
1875        let epoch1 = INITIAL_EPOCH.next_epoch();
1876        const VECTOR_INDEX_TABLE_ID: TableId = TableId::new(234);
1877        uploader.start_epoch(
1878            epoch1,
1879            HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1880        );
1881        let imm = gen_imm(epoch1).await;
1882        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1883        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1884        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1885
1886        uploader.register_vector_writer(VECTOR_INDEX_TABLE_ID, epoch1);
1887        let vector_info_file = VectorFileInfo {
1888            object_id: 1.into(),
1889            vector_count: 1,
1890            file_size: 0,
1891            start_vector_id: 0,
1892            meta_offset: 20,
1893        };
1894        let vector_index_add = VectorIndexAdd::Flat(FlatIndexAdd {
1895            vector_store_info_delta: VectorStoreInfoDelta {
1896                next_vector_id: 1,
1897                added_vector_files: vec![vector_info_file.clone()],
1898            },
1899        });
1900        uploader.vector_writer_seal_epoch(
1901            VECTOR_INDEX_TABLE_ID,
1902            epoch1.next_epoch(),
1903            Some(vector_index_add.clone()),
1904        );
1905
1906        let (sync_tx, sync_rx) = oneshot::channel();
1907        uploader.start_single_epoch_sync(
1908            epoch1,
1909            sync_tx,
1910            HashSet::from_iter([TEST_TABLE_ID, VECTOR_INDEX_TABLE_ID]),
1911        );
1912        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1913        assert_eq!(1, uploader.data().syncing_data.len());
1914        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1915        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1916        assert!(syncing_data.uploaded.is_empty());
1917        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1918
1919        let staging_sst = uploader.next_uploaded_sst().await;
1920        assert_eq!(&vec![epoch1], staging_sst.epochs());
1921        assert_eq!(
1922            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1923            staging_sst.imm_ids()
1924        );
1925        assert_eq!(
1926            &dummy_success_upload_output().new_value_ssts,
1927            staging_sst.sstable_infos()
1928        );
1929
1930        match sync_rx.await {
1931            Ok(Ok(data)) => {
1932                let SyncedData {
1933                    uploaded_ssts,
1934                    table_watermarks,
1935                    vector_index_adds,
1936                } = data;
1937                assert_eq!(1, uploaded_ssts.len());
1938                let staging_sst = &uploaded_ssts[0];
1939                assert_eq!(&vec![epoch1], staging_sst.epochs());
1940                assert_eq!(
1941                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
1942                    staging_sst.imm_ids()
1943                );
1944                assert_eq!(
1945                    &dummy_success_upload_output().new_value_ssts,
1946                    staging_sst.sstable_infos()
1947                );
1948                assert!(table_watermarks.is_empty());
1949                assert_eq!(vector_index_adds.len(), 1);
1950                let (table_id, vector_index_adds) = vector_index_adds.into_iter().next().unwrap();
1951                assert_eq!(table_id, VECTOR_INDEX_TABLE_ID);
1952                assert_eq!(vector_index_adds.len(), 1);
1953                let synced_vector_index_add = vector_index_adds[0].clone();
1954                assert_eq!(vector_index_add, synced_vector_index_add);
1955            }
1956            _ => unreachable!(),
1957        };
1958        assert_eq!(epoch1, uploader.test_max_synced_epoch());
1959
1960        let new_pinned_version = uploader
1961            .context
1962            .pinned_version
1963            .new_pin_version(test_hummock_version(epoch1))
1964            .unwrap();
1965        uploader.update_pinned_version(new_pinned_version);
1966        assert_eq!(
1967            epoch1,
1968            uploader
1969                .context
1970                .pinned_version
1971                .table_committed_epoch(TEST_TABLE_ID)
1972                .unwrap()
1973        );
1974    }
1975
1976    #[tokio::test]
1977    async fn test_uploader_destroy_instance_before_sync() {
1978        let mut uploader = test_uploader(dummy_success_upload_future);
1979        let epoch1 = INITIAL_EPOCH.next_epoch();
1980        uploader.start_epochs_for_test([epoch1]);
1981        let imm = gen_imm(epoch1).await;
1982        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
1983        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
1984        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
1985        uploader.may_destroy_instance(TEST_LOCAL_INSTANCE_ID);
1986
1987        let (sync_tx, sync_rx) = oneshot::channel();
1988        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
1989        assert_eq!(epoch1 as HummockEpoch, uploader.test_max_syncing_epoch());
1990        assert_eq!(1, uploader.data().syncing_data.len());
1991        let (_, syncing_data) = uploader.data().syncing_data.first_key_value().unwrap();
1992        assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_table_epochs[0].0);
1993        assert!(syncing_data.uploaded.is_empty());
1994        assert!(!syncing_data.remaining_uploading_tasks.is_empty());
1995
1996        let staging_sst = uploader.next_uploaded_sst().await;
1997        assert_eq!(&vec![epoch1], staging_sst.epochs());
1998        assert_eq!(
1999            &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2000            staging_sst.imm_ids()
2001        );
2002        assert_eq!(
2003            &dummy_success_upload_output().new_value_ssts,
2004            staging_sst.sstable_infos()
2005        );
2006
2007        match sync_rx.await {
2008            Ok(Ok(data)) => {
2009                let SyncedData {
2010                    uploaded_ssts,
2011                    table_watermarks,
2012                    ..
2013                } = data;
2014                assert_eq!(1, uploaded_ssts.len());
2015                let staging_sst = &uploaded_ssts[0];
2016                assert_eq!(&vec![epoch1], staging_sst.epochs());
2017                assert_eq!(
2018                    &HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, vec![imm.batch_id()])]),
2019                    staging_sst.imm_ids()
2020                );
2021                assert_eq!(
2022                    &dummy_success_upload_output().new_value_ssts,
2023                    staging_sst.sstable_infos()
2024                );
2025                assert!(table_watermarks.is_empty());
2026            }
2027            _ => unreachable!(),
2028        };
2029        assert!(
2030            !uploader
2031                .data()
2032                .unsync_data
2033                .table_data
2034                .contains_key(&TEST_TABLE_ID)
2035        );
2036    }
2037
2038    #[tokio::test]
2039    async fn test_empty_uploader_sync() {
2040        let mut uploader = test_uploader(dummy_success_upload_future);
2041        let epoch1 = INITIAL_EPOCH.next_epoch();
2042
2043        let (sync_tx, sync_rx) = oneshot::channel();
2044        uploader.start_epochs_for_test([epoch1]);
2045        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2046        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2047        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2048        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2049
2050        assert_uploader_pending(&mut uploader).await;
2051
2052        match sync_rx.await {
2053            Ok(Ok(data)) => {
2054                assert!(data.uploaded_ssts.is_empty());
2055            }
2056            _ => unreachable!(),
2057        };
2058        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2059        let new_pinned_version = uploader
2060            .context
2061            .pinned_version
2062            .new_pin_version(test_hummock_version(epoch1))
2063            .unwrap();
2064        uploader.update_pinned_version(new_pinned_version);
2065        assert!(uploader.data().syncing_data.is_empty());
2066        assert_eq!(
2067            epoch1,
2068            uploader
2069                .context
2070                .pinned_version
2071                .table_committed_epoch(TEST_TABLE_ID)
2072                .unwrap()
2073        );
2074    }
2075
2076    #[tokio::test]
2077    async fn test_uploader_empty_epoch() {
2078        let mut uploader = test_uploader(dummy_success_upload_future);
2079        let epoch1 = INITIAL_EPOCH.next_epoch();
2080        let epoch2 = epoch1.next_epoch();
2081        uploader.start_epochs_for_test([epoch1, epoch2]);
2082        let imm = gen_imm(epoch2).await;
2083        // epoch1 is empty while epoch2 is not. Going to seal empty epoch1.
2084        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1);
2085        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1);
2086        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm);
2087
2088        let (sync_tx, sync_rx) = oneshot::channel();
2089        uploader.start_single_epoch_sync(epoch1, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2090        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2091
2092        assert_uploader_pending(&mut uploader).await;
2093
2094        match sync_rx.await {
2095            Ok(Ok(data)) => {
2096                assert!(data.uploaded_ssts.is_empty());
2097            }
2098            _ => unreachable!(),
2099        };
2100        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2101        let new_pinned_version = uploader
2102            .context
2103            .pinned_version
2104            .new_pin_version(test_hummock_version(epoch1))
2105            .unwrap();
2106        uploader.update_pinned_version(new_pinned_version);
2107        assert!(uploader.data().syncing_data.is_empty());
2108        assert_eq!(
2109            epoch1,
2110            uploader
2111                .context
2112                .pinned_version
2113                .table_committed_epoch(TEST_TABLE_ID)
2114                .unwrap()
2115        );
2116    }
2117
2118    #[tokio::test]
2119    async fn test_uploader_poll_empty() {
2120        let mut uploader = test_uploader(dummy_success_upload_future);
2121        let fut = uploader.next_uploaded_sst();
2122        let mut fut = pin!(fut);
2123        assert!(poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx).is_pending())).await);
2124    }
2125
2126    #[tokio::test]
2127    async fn test_uploader_empty_advance_mce() {
2128        let mut uploader = test_uploader(dummy_success_upload_future);
2129        let initial_pinned_version = uploader.context.pinned_version.clone();
2130        let epoch1 = INITIAL_EPOCH.next_epoch();
2131        let epoch2 = epoch1.next_epoch();
2132        let epoch3 = epoch2.next_epoch();
2133        let epoch4 = epoch3.next_epoch();
2134        let epoch5 = epoch4.next_epoch();
2135        let epoch6 = epoch5.next_epoch();
2136        let version1 = initial_pinned_version
2137            .new_pin_version(test_hummock_version(epoch1))
2138            .unwrap();
2139        let version2 = initial_pinned_version
2140            .new_pin_version(test_hummock_version(epoch2))
2141            .unwrap();
2142        let version3 = initial_pinned_version
2143            .new_pin_version(test_hummock_version(epoch3))
2144            .unwrap();
2145        let version4 = initial_pinned_version
2146            .new_pin_version(test_hummock_version(epoch4))
2147            .unwrap();
2148        let version5 = initial_pinned_version
2149            .new_pin_version(test_hummock_version(epoch5))
2150            .unwrap();
2151
2152        uploader.start_epochs_for_test([epoch6]);
2153        uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6);
2154
2155        uploader.update_pinned_version(version1);
2156        assert_eq!(epoch1, uploader.test_max_synced_epoch());
2157        assert_eq!(epoch1, uploader.test_max_syncing_epoch());
2158
2159        let imm = gen_imm(epoch6).await;
2160        uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone());
2161        uploader.update_pinned_version(version2);
2162        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2163        assert_eq!(epoch2, uploader.test_max_syncing_epoch());
2164
2165        uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6);
2166        uploader.update_pinned_version(version3);
2167        assert_eq!(epoch3, uploader.test_max_synced_epoch());
2168        assert_eq!(epoch3, uploader.test_max_syncing_epoch());
2169
2170        let (sync_tx, sync_rx) = oneshot::channel();
2171        uploader.start_single_epoch_sync(epoch6, sync_tx, HashSet::from_iter([TEST_TABLE_ID]));
2172        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2173        uploader.update_pinned_version(version4);
2174        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2175        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2176
2177        let sst = uploader.next_uploaded_sst().await;
2178        assert_eq!(&get_imm_ids([&imm]), sst.imm_ids());
2179
2180        match sync_rx.await {
2181            Ok(Ok(data)) => {
2182                assert!(data.table_watermarks.is_empty());
2183                assert_eq!(1, data.uploaded_ssts.len());
2184                assert_eq!(&get_imm_ids([&imm]), data.uploaded_ssts[0].imm_ids());
2185            }
2186            _ => unreachable!(),
2187        }
2188
2189        uploader.update_pinned_version(version5);
2190        assert_eq!(epoch6, uploader.test_max_synced_epoch());
2191        assert_eq!(epoch6, uploader.test_max_syncing_epoch());
2192    }
2193
2194    #[tokio::test]
2195    async fn test_uploader_finish_in_order() {
2196        let config = StorageOpts {
2197            shared_buffer_capacity_mb: 1024 * 1024,
2198            shared_buffer_flush_ratio: 0.0,
2199            ..Default::default()
2200        };
2201        let (buffer_tracker, mut uploader, new_task_notifier) =
2202            prepare_uploader_order_test(&config, false);
2203
2204        let epoch1 = INITIAL_EPOCH.next_epoch();
2205        let epoch2 = epoch1.next_epoch();
2206        let epoch3 = epoch2.next_epoch();
2207        let epoch4 = epoch3.next_epoch();
2208        uploader.start_epochs_for_test([epoch1, epoch2, epoch3, epoch4]);
2209        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2210        let memory_limiter = Some(memory_limiter.deref());
2211
2212        let instance_id1 = 1;
2213        let instance_id2 = 2;
2214
2215        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2216        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2217
2218        // imm2 contains data in newer epoch, but added first
2219        let imm2 = gen_imm_with_limiter(epoch2, memory_limiter).await;
2220        uploader.add_imm(instance_id2, imm2.clone());
2221        let imm1_1 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2222        uploader.add_imm(instance_id1, imm1_1.clone());
2223        let imm1_2 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2224        uploader.add_imm(instance_id1, imm1_2.clone());
2225
2226        // imm1 will be spilled first
2227        let epoch1_spill_payload12 =
2228            HashMap::from_iter([(instance_id1, vec![imm1_2.clone(), imm1_1.clone()])]);
2229        let epoch2_spill_payload = HashMap::from_iter([(instance_id2, vec![imm2.clone()])]);
2230        let (await_start1, finish_tx1) =
2231            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload12));
2232        let (await_start2, finish_tx2) =
2233            new_task_notifier(get_payload_imm_ids(&epoch2_spill_payload));
2234        uploader.may_flush();
2235        await_start1.await;
2236        await_start2.await;
2237
2238        assert_uploader_pending(&mut uploader).await;
2239
2240        finish_tx2.send(()).unwrap();
2241        assert_uploader_pending(&mut uploader).await;
2242
2243        finish_tx1.send(()).unwrap();
2244        let sst = uploader.next_uploaded_sst().await;
2245        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload12), sst.imm_ids());
2246        assert_eq!(&vec![epoch1], sst.epochs());
2247
2248        let sst = uploader.next_uploaded_sst().await;
2249        assert_eq!(&get_payload_imm_ids(&epoch2_spill_payload), sst.imm_ids());
2250        assert_eq!(&vec![epoch2], sst.epochs());
2251
2252        let imm1_3 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2253        uploader.add_imm(instance_id1, imm1_3.clone());
2254        let epoch1_spill_payload3 = HashMap::from_iter([(instance_id1, vec![imm1_3.clone()])]);
2255        let (await_start1_3, finish_tx1_3) =
2256            new_task_notifier(get_payload_imm_ids(&epoch1_spill_payload3));
2257        uploader.may_flush();
2258        await_start1_3.await;
2259        let imm1_4 = gen_imm_with_limiter(epoch1, memory_limiter).await;
2260        uploader.add_imm(instance_id1, imm1_4.clone());
2261        let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]);
2262        let (await_start1_4, finish_tx1_4) =
2263            new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload));
2264        uploader.local_seal_epoch_for_test(instance_id1, epoch1);
2265        let (sync_tx1, mut sync_rx1) = oneshot::channel();
2266        uploader.start_single_epoch_sync(epoch1, sync_tx1, HashSet::from_iter([TEST_TABLE_ID]));
2267        await_start1_4.await;
2268
2269        uploader.local_seal_epoch_for_test(instance_id1, epoch2);
2270        uploader.local_seal_epoch_for_test(instance_id2, epoch2);
2271
2272        // current uploader state:
2273        // unsealed: empty
2274        // sealed: epoch2: uploaded sst([imm2])
2275        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2276
2277        let imm3_1 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2278        let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]);
2279        uploader.add_imm(instance_id1, imm3_1.clone());
2280        let (await_start3_1, finish_tx3_1) =
2281            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload1));
2282        uploader.may_flush();
2283        await_start3_1.await;
2284        let imm3_2 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2285        let epoch3_spill_payload2 = HashMap::from_iter([(instance_id2, vec![imm3_2.clone()])]);
2286        uploader.add_imm(instance_id2, imm3_2.clone());
2287        let (await_start3_2, finish_tx3_2) =
2288            new_task_notifier(get_payload_imm_ids(&epoch3_spill_payload2));
2289        uploader.may_flush();
2290        await_start3_2.await;
2291        let imm3_3 = gen_imm_with_limiter(epoch3, memory_limiter).await;
2292        uploader.add_imm(instance_id1, imm3_3.clone());
2293
2294        // current uploader state:
2295        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2296        // sealed: uploaded sst([imm2])
2297        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2298
2299        uploader.local_seal_epoch_for_test(instance_id1, epoch3);
2300        let imm4 = gen_imm_with_limiter(epoch4, memory_limiter).await;
2301        uploader.add_imm(instance_id1, imm4.clone());
2302        assert_uploader_pending(&mut uploader).await;
2303
2304        // current uploader state:
2305        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2306        //           epoch4: imm: imm4
2307        // sealed: uploaded sst([imm2])
2308        // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1])
2309
2310        finish_tx3_1.send(()).unwrap();
2311        assert_uploader_pending(&mut uploader).await;
2312        finish_tx1_4.send(()).unwrap();
2313        assert_uploader_pending(&mut uploader).await;
2314        finish_tx1_3.send(()).unwrap();
2315
2316        let sst = uploader.next_uploaded_sst().await;
2317        assert_eq!(&get_payload_imm_ids(&epoch1_spill_payload3), sst.imm_ids());
2318
2319        assert!(poll_fn(|cx| Poll::Ready(sync_rx1.poll_unpin(cx).is_pending())).await);
2320
2321        let sst = uploader.next_uploaded_sst().await;
2322        assert_eq!(&get_payload_imm_ids(&epoch1_sync_payload), sst.imm_ids());
2323
2324        match sync_rx1.await {
2325            Ok(Ok(data)) => {
2326                assert_eq!(3, data.uploaded_ssts.len());
2327                assert_eq!(
2328                    &get_payload_imm_ids(&epoch1_sync_payload),
2329                    data.uploaded_ssts[0].imm_ids()
2330                );
2331                assert_eq!(
2332                    &get_payload_imm_ids(&epoch1_spill_payload3),
2333                    data.uploaded_ssts[1].imm_ids()
2334                );
2335                assert_eq!(
2336                    &get_payload_imm_ids(&epoch1_spill_payload12),
2337                    data.uploaded_ssts[2].imm_ids()
2338                );
2339            }
2340            _ => {
2341                unreachable!()
2342            }
2343        }
2344
2345        // current uploader state:
2346        // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1]
2347        //           epoch4: imm: imm4
2348        // sealed: uploaded sst([imm2])
2349        // syncing: empty
2350        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2351
2352        let (sync_tx2, sync_rx2) = oneshot::channel();
2353        uploader.start_single_epoch_sync(epoch2, sync_tx2, HashSet::from_iter([TEST_TABLE_ID]));
2354        uploader.local_seal_epoch_for_test(instance_id2, epoch3);
2355        let sst = uploader.next_uploaded_sst().await;
2356        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids());
2357
2358        match sync_rx2.await {
2359            Ok(Ok(data)) => {
2360                assert_eq!(data.uploaded_ssts.len(), 1);
2361                assert_eq!(
2362                    &get_payload_imm_ids(&epoch2_spill_payload),
2363                    data.uploaded_ssts[0].imm_ids()
2364                );
2365            }
2366            _ => {
2367                unreachable!("should be sync finish");
2368            }
2369        }
2370        assert_eq!(epoch2, uploader.test_max_synced_epoch());
2371
2372        // current uploader state:
2373        // unsealed: epoch4: imm: imm4
2374        // sealed: imm: imm3_3, uploading: [imm3_2], uploaded: sst([imm3_1])
2375        // syncing: empty
2376        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2377        //         epoch2: sst([imm2])
2378
2379        uploader.local_seal_epoch_for_test(instance_id1, epoch4);
2380        uploader.local_seal_epoch_for_test(instance_id2, epoch4);
2381        let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]);
2382        let (await_start4_with_3_3, finish_tx4_with_3_3) =
2383            new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload));
2384        let (sync_tx4, mut sync_rx4) = oneshot::channel();
2385        uploader.start_single_epoch_sync(epoch4, sync_tx4, HashSet::from_iter([TEST_TABLE_ID]));
2386        await_start4_with_3_3.await;
2387
2388        // current uploader state:
2389        // unsealed: empty
2390        // sealed: empty
2391        // syncing: epoch4: uploading: [imm4, imm3_3], [imm3_2], uploaded: sst([imm3_1])
2392        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2393        //         epoch2: sst([imm2])
2394
2395        assert_uploader_pending(&mut uploader).await;
2396
2397        finish_tx3_2.send(()).unwrap();
2398        let sst = uploader.next_uploaded_sst().await;
2399        assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload2), sst.imm_ids());
2400
2401        finish_tx4_with_3_3.send(()).unwrap();
2402        assert!(poll_fn(|cx| Poll::Ready(sync_rx4.poll_unpin(cx).is_pending())).await);
2403
2404        let sst = uploader.next_uploaded_sst().await;
2405        assert_eq!(&get_payload_imm_ids(&epoch4_sync_payload), sst.imm_ids());
2406
2407        match sync_rx4.await {
2408            Ok(Ok(data)) => {
2409                assert_eq!(3, data.uploaded_ssts.len());
2410                assert_eq!(
2411                    &get_payload_imm_ids(&epoch4_sync_payload),
2412                    data.uploaded_ssts[0].imm_ids()
2413                );
2414                assert_eq!(
2415                    &get_payload_imm_ids(&epoch3_spill_payload2),
2416                    data.uploaded_ssts[1].imm_ids()
2417                );
2418                assert_eq!(
2419                    &get_payload_imm_ids(&epoch3_spill_payload1),
2420                    data.uploaded_ssts[2].imm_ids(),
2421                )
2422            }
2423            _ => {
2424                unreachable!("should be sync finish");
2425            }
2426        }
2427        assert_eq!(epoch4, uploader.test_max_synced_epoch());
2428
2429        // current uploader state:
2430        // unsealed: empty
2431        // sealed: empty
2432        // syncing: empty
2433        // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1])
2434        //         epoch2: sst([imm2])
2435        //         epoch4: sst([imm4, imm3_3]), sst([imm3_2]), sst([imm3_1])
2436    }
2437
2438    #[tokio::test]
2439    async fn test_uploader_frequently_flush() {
2440        let config = StorageOpts {
2441            shared_buffer_capacity_mb: 10,
2442            shared_buffer_flush_ratio: 0.8,
2443            // This test will fail when we set it to 0
2444            shared_buffer_min_batch_flush_size_mb: 1,
2445            ..Default::default()
2446        };
2447        let (buffer_tracker, mut uploader, _new_task_notifier) =
2448            prepare_uploader_order_test(&config, true);
2449
2450        let epoch1 = INITIAL_EPOCH.next_epoch();
2451        let epoch2 = epoch1.next_epoch();
2452        uploader.start_epochs_for_test([epoch1, epoch2]);
2453        let instance_id1 = 1;
2454        let instance_id2 = 2;
2455        let flush_threshold = buffer_tracker.flush_threshold();
2456        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
2457
2458        uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1);
2459        uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2);
2460
2461        // imm2 contains data in newer epoch, but added first
2462        let mut total_memory = 0;
2463        while total_memory < flush_threshold {
2464            let imm = gen_imm_with_limiter(epoch2, Some(memory_limiter.as_ref())).await;
2465            total_memory += imm.size();
2466            if total_memory > flush_threshold {
2467                break;
2468            }
2469            uploader.add_imm(instance_id2, imm);
2470        }
2471        let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2472        uploader.add_imm(instance_id1, imm);
2473        assert!(uploader.may_flush());
2474
2475        for _ in 0..10 {
2476            let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await;
2477            uploader.add_imm(instance_id1, imm);
2478            assert!(!uploader.may_flush());
2479        }
2480    }
2481}