risingwave_storage/
store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_hummock_sdk::HummockReadEpoch;
31use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
32use risingwave_hummock_sdk::table_watermark::{
33    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
34};
35use risingwave_hummock_trace::{
36    TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
37    TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
38};
39use risingwave_pb::hummock::PbVnodeWatermark;
40
41use crate::error::{StorageError, StorageResult};
42use crate::hummock::CachePolicy;
43use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
44pub(crate) use crate::vector::{DistanceMeasurement, OnNearestItem, Vector};
45
46pub trait StaticSendSync = Send + Sync + 'static;
47
48pub trait IterItem: Send + 'static {
49    type ItemRef<'a>: Send + Copy + 'a;
50}
51
52impl IterItem for StateStoreKeyedRow {
53    type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
54}
55
56impl IterItem for StateStoreReadLogItem {
57    type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
58}
59
60pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
61    fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
62}
63
64pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
65    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
66}
67
68pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
69    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
70        + Send;
71
72    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
73        self,
74        f: F,
75    ) -> Self::ItemStream<O, F>;
76
77    fn fused(self) -> FusedStateStoreIter<Self, T> {
78        FusedStateStoreIter::new(self)
79    }
80}
81
82#[try_stream(ok = O, error = StorageError)]
83async fn into_stream_inner<
84    T: IterItem,
85    I: StateStoreIter<T>,
86    O: Send,
87    F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
88>(
89    iter: I,
90    f: F,
91) {
92    let mut iter = iter.fused();
93    while let Some(item) = iter.try_next().await? {
94        yield f(item)?;
95    }
96}
97
98pub struct FromStreamStateStoreIter<S> {
99    inner: S,
100    item_buffer: Option<StateStoreKeyedRow>,
101}
102
103impl<S> FromStreamStateStoreIter<S> {
104    pub fn new(inner: S) -> Self {
105        Self {
106            inner,
107            item_buffer: None,
108        }
109    }
110}
111
112impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
113    for FromStreamStateStoreIter<S>
114{
115    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
116        self.item_buffer = self.inner.try_next().await?;
117        Ok(self
118            .item_buffer
119            .as_ref()
120            .map(|(key, value)| (key.to_ref(), value.as_ref())))
121    }
122}
123
124pub struct FusedStateStoreIter<I, T> {
125    inner: I,
126    finished: bool,
127    _phantom: PhantomData<T>,
128}
129
130impl<I, T> FusedStateStoreIter<I, T> {
131    fn new(inner: I) -> Self {
132        Self {
133            inner,
134            finished: false,
135            _phantom: PhantomData,
136        }
137    }
138}
139
140impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
141    async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
142        assert!(!self.finished, "call try_next after finish");
143        let result = self.inner.try_next().await;
144        match &result {
145            Ok(Some(_)) => {}
146            Ok(None) | Err(_) => {
147                self.finished = true;
148            }
149        }
150        result
151    }
152}
153
154impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
155    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
156        impl Stream<Item = StorageResult<O>> + Send;
157
158    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
159        self,
160        f: F,
161    ) -> Self::ItemStream<O, F> {
162        into_stream_inner(self, f)
163    }
164}
165
166pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
167pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
168pub trait StateStoreReadIter = StateStoreIter + 'static;
169
170#[derive(Clone, Copy, Eq, PartialEq, Debug)]
171pub enum ChangeLogValue<T> {
172    Insert(T),
173    Update { new_value: T, old_value: T },
174    Delete(T),
175}
176
177impl<T> ChangeLogValue<T> {
178    pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
179        Ok(match self {
180            ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
181            ChangeLogValue::Update {
182                new_value,
183                old_value,
184            } => ChangeLogValue::Update {
185                new_value: f(new_value)?,
186                old_value: f(old_value)?,
187            },
188            ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
189        })
190    }
191
192    pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
193        std::iter::from_coroutine(
194            #[coroutine]
195            move || match self {
196                Self::Insert(row) => {
197                    yield (Op::Insert, row);
198                }
199                Self::Delete(row) => {
200                    yield (Op::Delete, row);
201                }
202                Self::Update {
203                    old_value,
204                    new_value,
205                } => {
206                    yield (Op::UpdateDelete, old_value);
207                    yield (Op::UpdateInsert, new_value);
208                }
209            },
210        )
211    }
212}
213
214impl<T: AsRef<[u8]>> ChangeLogValue<T> {
215    pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
216        match self {
217            ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
218            ChangeLogValue::Update {
219                new_value,
220                old_value,
221            } => ChangeLogValue::Update {
222                new_value: new_value.as_ref(),
223                old_value: old_value.as_ref(),
224            },
225            ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
226        }
227    }
228}
229
230pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
231pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
232
233#[derive(Clone)]
234pub struct NextEpochOptions {
235    pub table_id: TableId,
236}
237
238#[derive(Clone)]
239pub struct ReadLogOptions {
240    pub table_id: TableId,
241}
242
243pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
244pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
245
246pub trait StateStoreReadLog: StaticSendSync {
247    type ChangeLogIter: StateStoreReadChangeLogIter;
248
249    fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
250
251    fn iter_log(
252        &self,
253        epoch_range: (u64, u64),
254        key_range: TableKeyRange,
255        options: ReadLogOptions,
256    ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
257}
258
259pub trait KeyValueFn<O> =
260    for<'kv> FnOnce(FullKey<&'kv [u8]>, &'kv [u8]) -> StorageResult<O> + Send + 'static;
261
262pub trait StateStoreGet: StaticSendSync {
263    fn on_key_value<O: Send + 'static>(
264        &self,
265        key: TableKey<Bytes>,
266        read_options: ReadOptions,
267        on_key_value_fn: impl KeyValueFn<O>,
268    ) -> impl StorageFuture<'_, Option<O>>;
269}
270
271pub trait StateStoreRead: StateStoreGet + StaticSendSync {
272    type Iter: StateStoreReadIter;
273    type RevIter: StateStoreReadIter;
274
275    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
276    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
277    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
278    /// in `key_range`) The returned iterator will iterate data based on a snapshot
279    /// corresponding to the given `epoch`.
280    fn iter(
281        &self,
282        key_range: TableKeyRange,
283        read_options: ReadOptions,
284    ) -> impl StorageFuture<'_, Self::Iter>;
285
286    fn rev_iter(
287        &self,
288        key_range: TableKeyRange,
289        read_options: ReadOptions,
290    ) -> impl StorageFuture<'_, Self::RevIter>;
291}
292
293#[derive(Clone)]
294pub struct TryWaitEpochOptions {
295    pub table_id: TableId,
296}
297
298impl TryWaitEpochOptions {
299    #[cfg(any(test, feature = "test"))]
300    pub fn for_test(table_id: TableId) -> Self {
301        Self { table_id }
302    }
303}
304
305impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
306    fn from(value: TracedTryWaitEpochOptions) -> Self {
307        Self {
308            table_id: value.table_id.into(),
309        }
310    }
311}
312
313impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
314    fn from(value: TryWaitEpochOptions) -> Self {
315        Self {
316            table_id: value.table_id.into(),
317        }
318    }
319}
320
321#[derive(Clone, Copy)]
322pub struct NewReadSnapshotOptions {
323    pub table_id: TableId,
324}
325
326#[derive(Clone)]
327pub struct NewVectorWriterOptions {
328    pub table_id: TableId,
329}
330
331pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
332    type Local: LocalStateStore;
333    type ReadSnapshot: StateStoreRead + StateStoreReadVector + Clone;
334    type VectorWriter: StateStoreWriteVector;
335
336    /// If epoch is `Committed`, we will wait until the epoch is committed and its data is ready to
337    /// read. If epoch is `Current`, we will only check if the data can be read with this epoch.
338    fn try_wait_epoch(
339        &self,
340        epoch: HummockReadEpoch,
341        options: TryWaitEpochOptions,
342    ) -> impl StorageFuture<'_, ()>;
343
344    /// Creates a [`MonitoredStateStore`] from this state store, with given `stats`.
345    fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
346        MonitoredStateStore::new(self, storage_metrics)
347    }
348
349    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
350
351    fn new_read_snapshot(
352        &self,
353        epoch: HummockReadEpoch,
354        options: NewReadSnapshotOptions,
355    ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
356
357    fn new_vector_writer(
358        &self,
359        options: NewVectorWriterOptions,
360    ) -> impl Future<Output = Self::VectorWriter> + Send + '_;
361}
362
363/// A state store that is dedicated for streaming operator, which only reads the uncommitted data
364/// written by itself. Each local state store is not `Clone`, and is owned by a streaming state
365/// table.
366pub trait LocalStateStore: StateStoreGet + StateStoreWriteEpochControl + StaticSendSync {
367    type FlushedSnapshotReader: StateStoreRead;
368    type Iter<'a>: StateStoreIter + 'a;
369    type RevIter<'a>: StateStoreIter + 'a;
370
371    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
372    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
373    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
374    /// in `key_range`) The returned iterator will iterate data based on the latest written
375    /// snapshot.
376    fn iter(
377        &self,
378        key_range: TableKeyRange,
379        read_options: ReadOptions,
380    ) -> impl StorageFuture<'_, Self::Iter<'_>>;
381
382    fn rev_iter(
383        &self,
384        key_range: TableKeyRange,
385        read_options: ReadOptions,
386    ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
387
388    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
389
390    /// Get last persisted watermark for a given vnode.
391    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
392
393    /// Inserts a key-value entry associated with a given `epoch` into the state store.
394    fn insert(
395        &mut self,
396        key: TableKey<Bytes>,
397        new_val: Bytes,
398        old_val: Option<Bytes>,
399    ) -> StorageResult<()>;
400
401    /// Deletes a key-value entry from the state store. Only the key-value entry with epoch smaller
402    /// than the given `epoch` will be deleted.
403    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
404
405    // Updates the vnode bitmap corresponding to the local state store
406    // Returns the previous vnode bitmap
407    fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
408}
409
410pub trait StateStoreWriteEpochControl: StaticSendSync {
411    fn flush(&mut self) -> impl StorageFuture<'_, usize>;
412
413    fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
414
415    /// Initializes the state store with given `epoch` pair.
416    /// Typically we will use `epoch.curr` as the initialized epoch,
417    /// Since state table will begin as empty.
418    /// In some cases like replicated state table, state table may not be empty initially,
419    /// as such we need to wait for `epoch.prev` checkpoint to complete,
420    /// hence this interface is made async.
421    fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
422
423    /// Updates the monotonically increasing write epoch to `new_epoch`.
424    /// All writes after this function is called will be tagged with `new_epoch`. In other words,
425    /// the previous write epoch is sealed.
426    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
427}
428
429pub trait StateStoreWriteVector: StateStoreWriteEpochControl + StaticSendSync {
430    fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
431}
432
433pub struct VectorNearestOptions {
434    pub top_n: usize,
435    pub measure: DistanceMeasurement,
436}
437
438pub trait OnNearestItemFn<O> = OnNearestItem<O> + Send + Sync + 'static;
439
440pub trait StateStoreReadVector: StaticSendSync {
441    fn nearest<O: Send + 'static>(
442        &self,
443        vec: Vector,
444        options: VectorNearestOptions,
445        on_nearest_item_fn: impl OnNearestItemFn<O>,
446    ) -> impl StorageFuture<'_, Vec<O>>;
447}
448
449/// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory
450/// footprint of the CN process because the prefetched blocks cannot be evicted.
451/// Since the streaming-read of object-storage may hung in some case, we still use sync short read
452/// for both batch-query and streaming process. So this configure is unused.
453#[derive(Default, Clone, Copy)]
454pub struct PrefetchOptions {
455    pub prefetch: bool,
456    pub for_large_query: bool,
457}
458
459impl PrefetchOptions {
460    pub fn prefetch_for_large_range_scan() -> Self {
461        Self {
462            prefetch: true,
463            for_large_query: true,
464        }
465    }
466
467    pub fn prefetch_for_small_range_scan() -> Self {
468        Self {
469            prefetch: true,
470            for_large_query: false,
471        }
472    }
473
474    pub fn new(prefetch: bool, for_large_query: bool) -> Self {
475        Self {
476            prefetch,
477            for_large_query,
478        }
479    }
480}
481
482impl From<TracedPrefetchOptions> for PrefetchOptions {
483    fn from(value: TracedPrefetchOptions) -> Self {
484        Self {
485            prefetch: value.prefetch,
486            for_large_query: value.for_large_query,
487        }
488    }
489}
490
491impl From<PrefetchOptions> for TracedPrefetchOptions {
492    fn from(value: PrefetchOptions) -> Self {
493        Self {
494            prefetch: value.prefetch,
495            for_large_query: value.for_large_query,
496        }
497    }
498}
499
500#[derive(Default, Clone)]
501pub struct ReadOptions {
502    /// A hint for prefix key to check bloom filter.
503    /// If the `prefix_hint` is not None, it should be included in
504    /// `key` or `key_range` in the read API.
505    pub prefix_hint: Option<Bytes>,
506    pub prefetch_options: PrefetchOptions,
507    pub cache_policy: CachePolicy,
508
509    pub retention_seconds: Option<u32>,
510}
511
512impl From<TracedReadOptions> for ReadOptions {
513    fn from(value: TracedReadOptions) -> Self {
514        Self {
515            prefix_hint: value.prefix_hint.map(|b| b.into()),
516            prefetch_options: value.prefetch_options.into(),
517            cache_policy: value.cache_policy.into(),
518            retention_seconds: value.retention_seconds,
519        }
520    }
521}
522
523impl ReadOptions {
524    pub fn into_traced_read_options(
525        self,
526        table_id: TableId,
527        epoch: Option<HummockReadEpoch>,
528    ) -> TracedReadOptions {
529        let value = self;
530        let (read_version_from_backup, read_committed) = match epoch {
531            None | Some(HummockReadEpoch::NoWait(_)) => (false, false),
532            Some(HummockReadEpoch::Backup(_)) => (true, false),
533            Some(HummockReadEpoch::Committed(_))
534            | Some(HummockReadEpoch::BatchQueryCommitted(_, _))
535            | Some(HummockReadEpoch::TimeTravel(_)) => (false, true),
536        };
537        TracedReadOptions {
538            prefix_hint: value.prefix_hint.map(|b| b.into()),
539            prefetch_options: value.prefetch_options.into(),
540            cache_policy: value.cache_policy.into(),
541            retention_seconds: value.retention_seconds,
542            table_id: table_id.into(),
543            read_version_from_backup,
544            read_committed,
545        }
546    }
547}
548
549pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<&u32>) -> u64 {
550    let base_epoch = Epoch(base_epoch);
551    match retention_seconds {
552        Some(retention_seconds_u32) => {
553            base_epoch
554                .subtract_ms(*retention_seconds_u32 as u64 * 1000)
555                .0
556        }
557        None => 0,
558    }
559}
560
561pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
562
563pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
564    LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
565
566#[derive(Default, Clone)]
567pub enum OpConsistencyLevel {
568    #[default]
569    Inconsistent,
570    ConsistentOldValue {
571        check_old_value: Arc<dyn CheckOldValueEquality>,
572        /// whether should store the old value
573        is_log_store: bool,
574    },
575}
576
577impl Debug for OpConsistencyLevel {
578    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
579        match self {
580            OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
581            OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
582                .debug_struct("OpConsistencyLevel::ConsistentOldValue")
583                .field("is_log_store", is_log_store)
584                .finish(),
585        }
586    }
587}
588
589impl PartialEq<Self> for OpConsistencyLevel {
590    fn eq(&self, other: &Self) -> bool {
591        matches!(
592            (self, other),
593            (
594                OpConsistencyLevel::Inconsistent,
595                OpConsistencyLevel::Inconsistent
596            ) | (
597                OpConsistencyLevel::ConsistentOldValue {
598                    is_log_store: true,
599                    ..
600                },
601                OpConsistencyLevel::ConsistentOldValue {
602                    is_log_store: true,
603                    ..
604                },
605            ) | (
606                OpConsistencyLevel::ConsistentOldValue {
607                    is_log_store: false,
608                    ..
609                },
610                OpConsistencyLevel::ConsistentOldValue {
611                    is_log_store: false,
612                    ..
613                },
614            )
615        )
616    }
617}
618
619impl Eq for OpConsistencyLevel {}
620
621impl OpConsistencyLevel {
622    pub fn update(&mut self, new_level: &OpConsistencyLevel) {
623        assert_ne!(self, new_level);
624        *self = new_level.clone()
625    }
626}
627
628#[derive(Clone)]
629pub struct NewLocalOptions {
630    pub table_id: TableId,
631    /// Whether the operation is consistent. The term `consistent` requires the following:
632    ///
633    /// 1. A key cannot be inserted or deleted for more than once, i.e. inserting to an existing
634    ///    key or deleting an non-existing key is not allowed.
635    ///
636    /// 2. The old value passed from
637    ///    `update` and `delete` should match the original stored value.
638    pub op_consistency_level: OpConsistencyLevel,
639    pub table_option: TableOption,
640
641    /// Indicate if this is replicated. If it is, we should not
642    /// upload its `ReadVersions`.
643    pub is_replicated: bool,
644
645    /// The vnode bitmap for the local state store instance
646    pub vnodes: Arc<Bitmap>,
647
648    pub upload_on_flush: bool,
649}
650
651impl From<TracedNewLocalOptions> for NewLocalOptions {
652    fn from(value: TracedNewLocalOptions) -> Self {
653        Self {
654            table_id: value.table_id.into(),
655            op_consistency_level: match value.op_consistency_level {
656                TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
657                TracedOpConsistencyLevel::ConsistentOldValue => {
658                    OpConsistencyLevel::ConsistentOldValue {
659                        check_old_value: CHECK_BYTES_EQUAL.clone(),
660                        // TODO: for simplicity, set it to false
661                        is_log_store: false,
662                    }
663                }
664            },
665            table_option: value.table_option.into(),
666            is_replicated: value.is_replicated,
667            vnodes: Arc::new(value.vnodes.into()),
668            upload_on_flush: value.upload_on_flush,
669        }
670    }
671}
672
673impl From<NewLocalOptions> for TracedNewLocalOptions {
674    fn from(value: NewLocalOptions) -> Self {
675        Self {
676            table_id: value.table_id.into(),
677            op_consistency_level: match value.op_consistency_level {
678                OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
679                OpConsistencyLevel::ConsistentOldValue { .. } => {
680                    TracedOpConsistencyLevel::ConsistentOldValue
681                }
682            },
683            table_option: value.table_option.into(),
684            is_replicated: value.is_replicated,
685            vnodes: value.vnodes.as_ref().clone().into(),
686            upload_on_flush: value.upload_on_flush,
687        }
688    }
689}
690
691impl NewLocalOptions {
692    pub fn new(
693        table_id: TableId,
694        op_consistency_level: OpConsistencyLevel,
695        table_option: TableOption,
696        vnodes: Arc<Bitmap>,
697        upload_on_flush: bool,
698    ) -> Self {
699        NewLocalOptions {
700            table_id,
701            op_consistency_level,
702            table_option,
703            is_replicated: false,
704            vnodes,
705            upload_on_flush,
706        }
707    }
708
709    pub fn new_replicated(
710        table_id: TableId,
711        op_consistency_level: OpConsistencyLevel,
712        table_option: TableOption,
713        vnodes: Arc<Bitmap>,
714    ) -> Self {
715        NewLocalOptions {
716            table_id,
717            op_consistency_level,
718            table_option,
719            is_replicated: true,
720            vnodes,
721            upload_on_flush: false,
722        }
723    }
724
725    pub fn for_test(table_id: TableId) -> Self {
726        Self {
727            table_id,
728            op_consistency_level: OpConsistencyLevel::Inconsistent,
729            table_option: TableOption {
730                retention_seconds: None,
731            },
732            is_replicated: false,
733            vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
734            upload_on_flush: true,
735        }
736    }
737}
738
739#[derive(Clone)]
740pub struct InitOptions {
741    pub epoch: EpochPair,
742}
743
744impl InitOptions {
745    pub fn new(epoch: EpochPair) -> Self {
746        Self { epoch }
747    }
748}
749
750impl From<InitOptions> for TracedInitOptions {
751    fn from(value: InitOptions) -> Self {
752        TracedInitOptions {
753            epoch: value.epoch.into(),
754        }
755    }
756}
757
758impl From<TracedInitOptions> for InitOptions {
759    fn from(value: TracedInitOptions) -> Self {
760        InitOptions {
761            epoch: value.epoch.into(),
762        }
763    }
764}
765
766#[derive(Clone, Debug)]
767pub struct SealCurrentEpochOptions {
768    pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
769    pub switch_op_consistency_level: Option<OpConsistencyLevel>,
770}
771
772impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
773    fn from(value: SealCurrentEpochOptions) -> Self {
774        TracedSealCurrentEpochOptions {
775            table_watermarks: value.table_watermarks.map(
776                |(direction, watermarks, watermark_type)| {
777                    (
778                        direction == WatermarkDirection::Ascending,
779                        watermarks
780                            .into_iter()
781                            .map(|watermark| {
782                                let pb_watermark = PbVnodeWatermark::from(watermark);
783                                Message::encode_to_vec(&pb_watermark)
784                            })
785                            .collect(),
786                        match watermark_type {
787                            WatermarkSerdeType::NonPkPrefix => true,
788                            WatermarkSerdeType::PkPrefix => false,
789                        },
790                    )
791                },
792            ),
793            switch_op_consistency_level: value
794                .switch_op_consistency_level
795                .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
796        }
797    }
798}
799
800impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
801    fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
802        SealCurrentEpochOptions {
803            table_watermarks: value.table_watermarks.map(
804                |(is_ascending, watermarks, is_non_pk_prefix)| {
805                    (
806                        if is_ascending {
807                            WatermarkDirection::Ascending
808                        } else {
809                            WatermarkDirection::Descending
810                        },
811                        watermarks
812                            .into_iter()
813                            .map(|serialized_watermark| {
814                                Message::decode(serialized_watermark.as_slice())
815                                    .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
816                                    .expect("should not failed")
817                            })
818                            .collect(),
819                        if is_non_pk_prefix {
820                            WatermarkSerdeType::NonPkPrefix
821                        } else {
822                            WatermarkSerdeType::PkPrefix
823                        },
824                    )
825                },
826            ),
827            switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
828                if enable {
829                    OpConsistencyLevel::ConsistentOldValue {
830                        check_old_value: CHECK_BYTES_EQUAL.clone(),
831                        is_log_store: false,
832                    }
833                } else {
834                    OpConsistencyLevel::Inconsistent
835                }
836            }),
837        }
838    }
839}
840
841impl SealCurrentEpochOptions {
842    pub fn for_test() -> Self {
843        Self {
844            table_watermarks: None,
845            switch_op_consistency_level: None,
846        }
847    }
848}