risingwave_storage/
store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common::vector::distance::DistanceMeasurement;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
33use risingwave_hummock_sdk::table_watermark::{
34    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
35};
36use risingwave_hummock_trace::{
37    TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
38    TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
39};
40use risingwave_pb::hummock::PbVnodeWatermark;
41
42use crate::error::{StorageError, StorageResult};
43use crate::hummock::CachePolicy;
44use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
45pub(crate) use crate::vector::{OnNearestItem, Vector};
46
47pub trait StaticSendSync = Send + Sync + 'static;
48
49pub trait IterItem: Send + 'static {
50    type ItemRef<'a>: Send + Copy + 'a;
51}
52
53impl IterItem for StateStoreKeyedRow {
54    type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
55}
56
57impl IterItem for StateStoreReadLogItem {
58    type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
59}
60
61pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
62    fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
63}
64
65pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
66    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
67}
68
69pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
70    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
71        + Send;
72
73    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
74        self,
75        f: F,
76    ) -> Self::ItemStream<O, F>;
77
78    fn fused(self) -> FusedStateStoreIter<Self, T> {
79        FusedStateStoreIter::new(self)
80    }
81}
82
83#[try_stream(ok = O, error = StorageError)]
84async fn into_stream_inner<
85    T: IterItem,
86    I: StateStoreIter<T>,
87    O: Send,
88    F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
89>(
90    iter: I,
91    f: F,
92) {
93    let mut iter = iter.fused();
94    while let Some(item) = iter.try_next().await? {
95        yield f(item)?;
96    }
97}
98
99pub struct FromStreamStateStoreIter<S> {
100    inner: S,
101    item_buffer: Option<StateStoreKeyedRow>,
102}
103
104impl<S> FromStreamStateStoreIter<S> {
105    pub fn new(inner: S) -> Self {
106        Self {
107            inner,
108            item_buffer: None,
109        }
110    }
111}
112
113impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
114    for FromStreamStateStoreIter<S>
115{
116    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
117        self.item_buffer = self.inner.try_next().await?;
118        Ok(self
119            .item_buffer
120            .as_ref()
121            .map(|(key, value)| (key.to_ref(), value.as_ref())))
122    }
123}
124
125pub struct FusedStateStoreIter<I, T> {
126    inner: I,
127    finished: bool,
128    _phantom: PhantomData<T>,
129}
130
131impl<I, T> FusedStateStoreIter<I, T> {
132    fn new(inner: I) -> Self {
133        Self {
134            inner,
135            finished: false,
136            _phantom: PhantomData,
137        }
138    }
139}
140
141impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
142    async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
143        assert!(!self.finished, "call try_next after finish");
144        let result = self.inner.try_next().await;
145        match &result {
146            Ok(Some(_)) => {}
147            Ok(None) | Err(_) => {
148                self.finished = true;
149            }
150        }
151        result
152    }
153}
154
155impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
156    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
157        impl Stream<Item = StorageResult<O>> + Send;
158
159    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
160        self,
161        f: F,
162    ) -> Self::ItemStream<O, F> {
163        into_stream_inner(self, f)
164    }
165}
166
167pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
168pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
169pub trait StateStoreReadIter = StateStoreIter + 'static;
170
171#[derive(Clone, Copy, Eq, PartialEq, Debug)]
172pub enum ChangeLogValue<T> {
173    Insert(T),
174    Update { new_value: T, old_value: T },
175    Delete(T),
176}
177
178impl<T> ChangeLogValue<T> {
179    pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
180        Ok(match self {
181            ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
182            ChangeLogValue::Update {
183                new_value,
184                old_value,
185            } => ChangeLogValue::Update {
186                new_value: f(new_value)?,
187                old_value: f(old_value)?,
188            },
189            ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
190        })
191    }
192
193    pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
194        std::iter::from_coroutine(
195            #[coroutine]
196            move || match self {
197                Self::Insert(row) => {
198                    yield (Op::Insert, row);
199                }
200                Self::Delete(row) => {
201                    yield (Op::Delete, row);
202                }
203                Self::Update {
204                    old_value,
205                    new_value,
206                } => {
207                    yield (Op::UpdateDelete, old_value);
208                    yield (Op::UpdateInsert, new_value);
209                }
210            },
211        )
212    }
213}
214
215impl<T: AsRef<[u8]>> ChangeLogValue<T> {
216    pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
217        match self {
218            ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
219            ChangeLogValue::Update {
220                new_value,
221                old_value,
222            } => ChangeLogValue::Update {
223                new_value: new_value.as_ref(),
224                old_value: old_value.as_ref(),
225            },
226            ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
227        }
228    }
229}
230
231pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
232pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
233
234#[derive(Clone)]
235pub struct NextEpochOptions {
236    pub table_id: TableId,
237}
238
239#[derive(Clone)]
240pub struct ReadLogOptions {
241    pub table_id: TableId,
242}
243
244pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
245pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
246
247pub trait StateStoreReadLog: StaticSendSync {
248    type ChangeLogIter: StateStoreReadChangeLogIter;
249
250    fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
251
252    fn iter_log(
253        &self,
254        epoch_range: (u64, u64),
255        key_range: TableKeyRange,
256        options: ReadLogOptions,
257    ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
258}
259
260pub trait KeyValueFn<O> =
261    for<'kv> FnOnce(FullKey<&'kv [u8]>, &'kv [u8]) -> StorageResult<O> + Send + 'static;
262
263pub trait StateStoreGet: StaticSendSync {
264    fn on_key_value<O: Send + 'static>(
265        &self,
266        key: TableKey<Bytes>,
267        read_options: ReadOptions,
268        on_key_value_fn: impl KeyValueFn<O>,
269    ) -> impl StorageFuture<'_, Option<O>>;
270}
271
272pub trait StateStoreRead: StateStoreGet + StaticSendSync {
273    type Iter: StateStoreReadIter;
274    type RevIter: StateStoreReadIter;
275
276    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
277    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
278    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
279    /// in `key_range`) The returned iterator will iterate data based on a snapshot
280    /// corresponding to the given `epoch`.
281    fn iter(
282        &self,
283        key_range: TableKeyRange,
284        read_options: ReadOptions,
285    ) -> impl StorageFuture<'_, Self::Iter>;
286
287    fn rev_iter(
288        &self,
289        key_range: TableKeyRange,
290        read_options: ReadOptions,
291    ) -> impl StorageFuture<'_, Self::RevIter>;
292}
293
294#[derive(Clone)]
295pub struct TryWaitEpochOptions {
296    pub table_id: TableId,
297}
298
299impl TryWaitEpochOptions {
300    #[cfg(any(test, feature = "test"))]
301    pub fn for_test(table_id: TableId) -> Self {
302        Self { table_id }
303    }
304}
305
306impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
307    fn from(value: TracedTryWaitEpochOptions) -> Self {
308        Self {
309            table_id: value.table_id.into(),
310        }
311    }
312}
313
314impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
315    fn from(value: TryWaitEpochOptions) -> Self {
316        Self {
317            table_id: value.table_id.into(),
318        }
319    }
320}
321
322#[derive(Clone, Copy)]
323pub struct NewReadSnapshotOptions {
324    pub table_id: TableId,
325}
326
327#[derive(Clone)]
328pub struct NewVectorWriterOptions {
329    pub table_id: TableId,
330}
331
332pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
333    type Local: LocalStateStore;
334    type ReadSnapshot: StateStoreRead + StateStoreReadVector + Clone;
335    type VectorWriter: StateStoreWriteVector;
336
337    /// If epoch is `Committed`, we will wait until the epoch is committed and its data is ready to
338    /// read. If epoch is `Current`, we will only check if the data can be read with this epoch.
339    fn try_wait_epoch(
340        &self,
341        epoch: HummockReadEpoch,
342        options: TryWaitEpochOptions,
343    ) -> impl StorageFuture<'_, ()>;
344
345    /// Creates a [`MonitoredStateStore`] from this state store, with given `stats`.
346    fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
347        MonitoredStateStore::new(self, storage_metrics)
348    }
349
350    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
351
352    fn new_read_snapshot(
353        &self,
354        epoch: HummockReadEpoch,
355        options: NewReadSnapshotOptions,
356    ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
357
358    fn new_vector_writer(
359        &self,
360        options: NewVectorWriterOptions,
361    ) -> impl Future<Output = Self::VectorWriter> + Send + '_;
362}
363
364/// A state store that is dedicated for streaming operator, which only reads the uncommitted data
365/// written by itself. Each local state store is not `Clone`, and is owned by a streaming state
366/// table.
367pub trait LocalStateStore: StateStoreGet + StateStoreWriteEpochControl + StaticSendSync {
368    type FlushedSnapshotReader: StateStoreRead;
369    type Iter<'a>: StateStoreIter + 'a;
370    type RevIter<'a>: StateStoreIter + 'a;
371
372    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
373    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
374    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
375    /// in `key_range`) The returned iterator will iterate data based on the latest written
376    /// snapshot.
377    fn iter(
378        &self,
379        key_range: TableKeyRange,
380        read_options: ReadOptions,
381    ) -> impl StorageFuture<'_, Self::Iter<'_>>;
382
383    fn rev_iter(
384        &self,
385        key_range: TableKeyRange,
386        read_options: ReadOptions,
387    ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
388
389    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
390
391    /// Get last persisted watermark for a given vnode.
392    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
393
394    /// Inserts a key-value entry associated with a given `epoch` into the state store.
395    fn insert(
396        &mut self,
397        key: TableKey<Bytes>,
398        new_val: Bytes,
399        old_val: Option<Bytes>,
400    ) -> StorageResult<()>;
401
402    /// Deletes a key-value entry from the state store. Only the key-value entry with epoch smaller
403    /// than the given `epoch` will be deleted.
404    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
405
406    // Updates the vnode bitmap corresponding to the local state store
407    // Returns the previous vnode bitmap
408    fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
409}
410
411pub trait StateStoreWriteEpochControl: StaticSendSync {
412    fn flush(&mut self) -> impl StorageFuture<'_, usize>;
413
414    fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
415
416    /// Initializes the state store with given `epoch` pair.
417    /// Typically we will use `epoch.curr` as the initialized epoch,
418    /// Since state table will begin as empty.
419    /// In some cases like replicated state table, state table may not be empty initially,
420    /// as such we need to wait for `epoch.prev` checkpoint to complete,
421    /// hence this interface is made async.
422    fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
423
424    /// Updates the monotonically increasing write epoch to `new_epoch`.
425    /// All writes after this function is called will be tagged with `new_epoch`. In other words,
426    /// the previous write epoch is sealed.
427    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
428}
429
430pub trait StateStoreWriteVector: StateStoreWriteEpochControl + StaticSendSync {
431    fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
432}
433
434pub struct VectorNearestOptions {
435    pub top_n: usize,
436    pub measure: DistanceMeasurement,
437    pub hnsw_ef_search: usize,
438}
439
440pub trait OnNearestItemFn<O> = OnNearestItem<O> + Send + Sync + 'static;
441
442pub trait StateStoreReadVector: StaticSendSync {
443    fn nearest<O: Send + 'static>(
444        &self,
445        vec: Vector,
446        options: VectorNearestOptions,
447        on_nearest_item_fn: impl OnNearestItemFn<O>,
448    ) -> impl StorageFuture<'_, Vec<O>>;
449}
450
451/// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory
452/// footprint of the CN process because the prefetched blocks cannot be evicted.
453/// Since the streaming-read of object-storage may hung in some case, we still use sync short read
454/// for both batch-query and streaming process. So this configure is unused.
455#[derive(Default, Clone, Copy)]
456pub struct PrefetchOptions {
457    pub prefetch: bool,
458    pub for_large_query: bool,
459}
460
461impl PrefetchOptions {
462    pub fn prefetch_for_large_range_scan() -> Self {
463        Self {
464            prefetch: true,
465            for_large_query: true,
466        }
467    }
468
469    pub fn prefetch_for_small_range_scan() -> Self {
470        Self {
471            prefetch: true,
472            for_large_query: false,
473        }
474    }
475
476    pub fn new(prefetch: bool, for_large_query: bool) -> Self {
477        Self {
478            prefetch,
479            for_large_query,
480        }
481    }
482}
483
484impl From<TracedPrefetchOptions> for PrefetchOptions {
485    fn from(value: TracedPrefetchOptions) -> Self {
486        Self {
487            prefetch: value.prefetch,
488            for_large_query: value.for_large_query,
489        }
490    }
491}
492
493impl From<PrefetchOptions> for TracedPrefetchOptions {
494    fn from(value: PrefetchOptions) -> Self {
495        Self {
496            prefetch: value.prefetch,
497            for_large_query: value.for_large_query,
498        }
499    }
500}
501
502#[derive(Default, Clone)]
503pub struct ReadOptions {
504    /// A hint for prefix key to check bloom filter.
505    /// If the `prefix_hint` is not None, it should be included in
506    /// `key` or `key_range` in the read API.
507    pub prefix_hint: Option<Bytes>,
508    pub prefetch_options: PrefetchOptions,
509    pub cache_policy: CachePolicy,
510
511    pub retention_seconds: Option<u32>,
512}
513
514impl From<TracedReadOptions> for ReadOptions {
515    fn from(value: TracedReadOptions) -> Self {
516        Self {
517            prefix_hint: value.prefix_hint.map(|b| b.into()),
518            prefetch_options: value.prefetch_options.into(),
519            cache_policy: value.cache_policy.into(),
520            retention_seconds: value.retention_seconds,
521        }
522    }
523}
524
525impl ReadOptions {
526    pub fn into_traced_read_options(
527        self,
528        table_id: TableId,
529        epoch: Option<HummockReadEpoch>,
530    ) -> TracedReadOptions {
531        let value = self;
532        let (read_version_from_backup, read_committed) = match epoch {
533            None | Some(HummockReadEpoch::NoWait(_)) => (false, false),
534            Some(HummockReadEpoch::Backup(_)) => (true, false),
535            Some(HummockReadEpoch::Committed(_))
536            | Some(HummockReadEpoch::BatchQueryCommitted(_, _))
537            | Some(HummockReadEpoch::TimeTravel(_)) => (false, true),
538        };
539        TracedReadOptions {
540            prefix_hint: value.prefix_hint.map(|b| b.into()),
541            prefetch_options: value.prefetch_options.into(),
542            cache_policy: value.cache_policy.into(),
543            retention_seconds: value.retention_seconds,
544            table_id: table_id.into(),
545            read_version_from_backup,
546            read_committed,
547        }
548    }
549}
550
551pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<&u32>) -> u64 {
552    let base_epoch = Epoch(base_epoch);
553    match retention_seconds {
554        Some(retention_seconds_u32) => {
555            base_epoch
556                .subtract_ms(*retention_seconds_u32 as u64 * 1000)
557                .0
558        }
559        None => 0,
560    }
561}
562
563pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
564
565pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
566    LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
567
568#[derive(Default, Clone)]
569pub enum OpConsistencyLevel {
570    #[default]
571    Inconsistent,
572    ConsistentOldValue {
573        check_old_value: Arc<dyn CheckOldValueEquality>,
574        /// whether should store the old value
575        is_log_store: bool,
576    },
577}
578
579impl Debug for OpConsistencyLevel {
580    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
581        match self {
582            OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
583            OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
584                .debug_struct("OpConsistencyLevel::ConsistentOldValue")
585                .field("is_log_store", is_log_store)
586                .finish(),
587        }
588    }
589}
590
591impl PartialEq<Self> for OpConsistencyLevel {
592    fn eq(&self, other: &Self) -> bool {
593        matches!(
594            (self, other),
595            (
596                OpConsistencyLevel::Inconsistent,
597                OpConsistencyLevel::Inconsistent
598            ) | (
599                OpConsistencyLevel::ConsistentOldValue {
600                    is_log_store: true,
601                    ..
602                },
603                OpConsistencyLevel::ConsistentOldValue {
604                    is_log_store: true,
605                    ..
606                },
607            ) | (
608                OpConsistencyLevel::ConsistentOldValue {
609                    is_log_store: false,
610                    ..
611                },
612                OpConsistencyLevel::ConsistentOldValue {
613                    is_log_store: false,
614                    ..
615                },
616            )
617        )
618    }
619}
620
621impl Eq for OpConsistencyLevel {}
622
623impl OpConsistencyLevel {
624    pub fn update(&mut self, new_level: &OpConsistencyLevel) {
625        assert_ne!(self, new_level);
626        *self = new_level.clone()
627    }
628}
629
630#[derive(Clone)]
631pub struct NewLocalOptions {
632    pub table_id: TableId,
633    /// Whether the operation is consistent. The term `consistent` requires the following:
634    ///
635    /// 1. A key cannot be inserted or deleted for more than once, i.e. inserting to an existing
636    ///    key or deleting an non-existing key is not allowed.
637    ///
638    /// 2. The old value passed from
639    ///    `update` and `delete` should match the original stored value.
640    pub op_consistency_level: OpConsistencyLevel,
641    pub table_option: TableOption,
642
643    /// Indicate if this is replicated. If it is, we should not
644    /// upload its `ReadVersions`.
645    pub is_replicated: bool,
646
647    /// The vnode bitmap for the local state store instance
648    pub vnodes: Arc<Bitmap>,
649
650    pub upload_on_flush: bool,
651}
652
653impl From<TracedNewLocalOptions> for NewLocalOptions {
654    fn from(value: TracedNewLocalOptions) -> Self {
655        Self {
656            table_id: value.table_id.into(),
657            op_consistency_level: match value.op_consistency_level {
658                TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
659                TracedOpConsistencyLevel::ConsistentOldValue => {
660                    OpConsistencyLevel::ConsistentOldValue {
661                        check_old_value: CHECK_BYTES_EQUAL.clone(),
662                        // TODO: for simplicity, set it to false
663                        is_log_store: false,
664                    }
665                }
666            },
667            table_option: value.table_option.into(),
668            is_replicated: value.is_replicated,
669            vnodes: Arc::new(value.vnodes.into()),
670            upload_on_flush: value.upload_on_flush,
671        }
672    }
673}
674
675impl From<NewLocalOptions> for TracedNewLocalOptions {
676    fn from(value: NewLocalOptions) -> Self {
677        Self {
678            table_id: value.table_id.into(),
679            op_consistency_level: match value.op_consistency_level {
680                OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
681                OpConsistencyLevel::ConsistentOldValue { .. } => {
682                    TracedOpConsistencyLevel::ConsistentOldValue
683                }
684            },
685            table_option: value.table_option.into(),
686            is_replicated: value.is_replicated,
687            vnodes: value.vnodes.as_ref().clone().into(),
688            upload_on_flush: value.upload_on_flush,
689        }
690    }
691}
692
693impl NewLocalOptions {
694    pub fn new(
695        table_id: TableId,
696        op_consistency_level: OpConsistencyLevel,
697        table_option: TableOption,
698        vnodes: Arc<Bitmap>,
699        upload_on_flush: bool,
700    ) -> Self {
701        NewLocalOptions {
702            table_id,
703            op_consistency_level,
704            table_option,
705            is_replicated: false,
706            vnodes,
707            upload_on_flush,
708        }
709    }
710
711    pub fn new_replicated(
712        table_id: TableId,
713        op_consistency_level: OpConsistencyLevel,
714        table_option: TableOption,
715        vnodes: Arc<Bitmap>,
716    ) -> Self {
717        NewLocalOptions {
718            table_id,
719            op_consistency_level,
720            table_option,
721            is_replicated: true,
722            vnodes,
723            upload_on_flush: false,
724        }
725    }
726
727    pub fn for_test(table_id: TableId) -> Self {
728        Self {
729            table_id,
730            op_consistency_level: OpConsistencyLevel::Inconsistent,
731            table_option: TableOption {
732                retention_seconds: None,
733            },
734            is_replicated: false,
735            vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
736            upload_on_flush: true,
737        }
738    }
739}
740
741#[derive(Clone)]
742pub struct InitOptions {
743    pub epoch: EpochPair,
744}
745
746impl InitOptions {
747    pub fn new(epoch: EpochPair) -> Self {
748        Self { epoch }
749    }
750}
751
752impl From<InitOptions> for TracedInitOptions {
753    fn from(value: InitOptions) -> Self {
754        TracedInitOptions {
755            epoch: value.epoch.into(),
756        }
757    }
758}
759
760impl From<TracedInitOptions> for InitOptions {
761    fn from(value: TracedInitOptions) -> Self {
762        InitOptions {
763            epoch: value.epoch.into(),
764        }
765    }
766}
767
768#[derive(Clone, Debug)]
769pub struct SealCurrentEpochOptions {
770    pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
771    pub switch_op_consistency_level: Option<OpConsistencyLevel>,
772}
773
774impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
775    fn from(value: SealCurrentEpochOptions) -> Self {
776        TracedSealCurrentEpochOptions {
777            table_watermarks: value.table_watermarks.map(
778                |(direction, watermarks, watermark_type)| {
779                    (
780                        direction == WatermarkDirection::Ascending,
781                        watermarks
782                            .into_iter()
783                            .map(|watermark| {
784                                let pb_watermark = PbVnodeWatermark::from(watermark);
785                                Message::encode_to_vec(&pb_watermark)
786                            })
787                            .collect(),
788                        match watermark_type {
789                            WatermarkSerdeType::NonPkPrefix => true,
790                            WatermarkSerdeType::PkPrefix => false,
791                        },
792                    )
793                },
794            ),
795            switch_op_consistency_level: value
796                .switch_op_consistency_level
797                .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
798        }
799    }
800}
801
802impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
803    fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
804        SealCurrentEpochOptions {
805            table_watermarks: value.table_watermarks.map(
806                |(is_ascending, watermarks, is_non_pk_prefix)| {
807                    (
808                        if is_ascending {
809                            WatermarkDirection::Ascending
810                        } else {
811                            WatermarkDirection::Descending
812                        },
813                        watermarks
814                            .into_iter()
815                            .map(|serialized_watermark| {
816                                Message::decode(serialized_watermark.as_slice())
817                                    .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
818                                    .expect("should not failed")
819                            })
820                            .collect(),
821                        if is_non_pk_prefix {
822                            WatermarkSerdeType::NonPkPrefix
823                        } else {
824                            WatermarkSerdeType::PkPrefix
825                        },
826                    )
827                },
828            ),
829            switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
830                if enable {
831                    OpConsistencyLevel::ConsistentOldValue {
832                        check_old_value: CHECK_BYTES_EQUAL.clone(),
833                        is_log_store: false,
834                    }
835                } else {
836                    OpConsistencyLevel::Inconsistent
837                }
838            }),
839        }
840    }
841}
842
843impl SealCurrentEpochOptions {
844    pub fn for_test() -> Self {
845        Self {
846            table_watermarks: None,
847            switch_op_consistency_level: None,
848        }
849    }
850}