risingwave_storage/
store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryFutureExt, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_hummock_sdk::HummockReadEpoch;
31use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
32use risingwave_hummock_sdk::table_watermark::{
33    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
34};
35use risingwave_hummock_trace::{
36    TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
37    TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
38    TracedWriteOptions,
39};
40use risingwave_pb::hummock::PbVnodeWatermark;
41
42use crate::error::{StorageError, StorageResult};
43use crate::hummock::CachePolicy;
44use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
45
46pub trait StaticSendSync = Send + Sync + 'static;
47
48pub trait IterItem: Send + 'static {
49    type ItemRef<'a>: Send + Copy + 'a;
50}
51
52impl IterItem for StateStoreKeyedRow {
53    type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
54}
55
56impl IterItem for StateStoreReadLogItem {
57    type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
58}
59
60pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
61    fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
62}
63
64pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
65    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
66}
67
68pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
69    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
70        + Send;
71
72    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
73        self,
74        f: F,
75    ) -> Self::ItemStream<O, F>;
76
77    fn fused(self) -> FusedStateStoreIter<Self, T> {
78        FusedStateStoreIter::new(self)
79    }
80}
81
82#[try_stream(ok = O, error = StorageError)]
83async fn into_stream_inner<
84    T: IterItem,
85    I: StateStoreIter<T>,
86    O: Send,
87    F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
88>(
89    iter: I,
90    f: F,
91) {
92    let mut iter = iter.fused();
93    while let Some(item) = iter.try_next().await? {
94        yield f(item)?;
95    }
96}
97
98pub struct FromStreamStateStoreIter<S> {
99    inner: S,
100    item_buffer: Option<StateStoreKeyedRow>,
101}
102
103impl<S> FromStreamStateStoreIter<S> {
104    pub fn new(inner: S) -> Self {
105        Self {
106            inner,
107            item_buffer: None,
108        }
109    }
110}
111
112impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
113    for FromStreamStateStoreIter<S>
114{
115    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
116        self.item_buffer = self.inner.try_next().await?;
117        Ok(self
118            .item_buffer
119            .as_ref()
120            .map(|(key, value)| (key.to_ref(), value.as_ref())))
121    }
122}
123
124pub struct FusedStateStoreIter<I, T> {
125    inner: I,
126    finished: bool,
127    _phantom: PhantomData<T>,
128}
129
130impl<I, T> FusedStateStoreIter<I, T> {
131    fn new(inner: I) -> Self {
132        Self {
133            inner,
134            finished: false,
135            _phantom: PhantomData,
136        }
137    }
138}
139
140impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
141    async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
142        assert!(!self.finished, "call try_next after finish");
143        let result = self.inner.try_next().await;
144        match &result {
145            Ok(Some(_)) => {}
146            Ok(None) | Err(_) => {
147                self.finished = true;
148            }
149        }
150        result
151    }
152}
153
154impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
155    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
156        impl Stream<Item = StorageResult<O>> + Send;
157
158    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
159        self,
160        f: F,
161    ) -> Self::ItemStream<O, F> {
162        into_stream_inner(self, f)
163    }
164}
165
166pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
167pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
168pub trait StateStoreReadIter = StateStoreIter + 'static;
169
170#[derive(Clone, Copy, Eq, PartialEq, Debug)]
171pub enum ChangeLogValue<T> {
172    Insert(T),
173    Update { new_value: T, old_value: T },
174    Delete(T),
175}
176
177impl<T> ChangeLogValue<T> {
178    pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
179        Ok(match self {
180            ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
181            ChangeLogValue::Update {
182                new_value,
183                old_value,
184            } => ChangeLogValue::Update {
185                new_value: f(new_value)?,
186                old_value: f(old_value)?,
187            },
188            ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
189        })
190    }
191
192    pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
193        std::iter::from_coroutine(
194            #[coroutine]
195            move || match self {
196                Self::Insert(row) => {
197                    yield (Op::Insert, row);
198                }
199                Self::Delete(row) => {
200                    yield (Op::Delete, row);
201                }
202                Self::Update {
203                    old_value,
204                    new_value,
205                } => {
206                    yield (Op::UpdateDelete, old_value);
207                    yield (Op::UpdateInsert, new_value);
208                }
209            },
210        )
211    }
212}
213
214impl<T: AsRef<[u8]>> ChangeLogValue<T> {
215    pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
216        match self {
217            ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
218            ChangeLogValue::Update {
219                new_value,
220                old_value,
221            } => ChangeLogValue::Update {
222                new_value: new_value.as_ref(),
223                old_value: old_value.as_ref(),
224            },
225            ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
226        }
227    }
228}
229
230pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
231pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
232
233#[derive(Clone)]
234pub struct NextEpochOptions {
235    pub table_id: TableId,
236}
237
238#[derive(Clone)]
239pub struct ReadLogOptions {
240    pub table_id: TableId,
241}
242
243pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
244pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
245
246pub trait StateStoreReadLog: StaticSendSync {
247    type ChangeLogIter: StateStoreReadChangeLogIter;
248
249    fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
250
251    fn iter_log(
252        &self,
253        epoch_range: (u64, u64),
254        key_range: TableKeyRange,
255        options: ReadLogOptions,
256    ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
257}
258
259pub trait StateStoreRead: StaticSendSync {
260    type Iter: StateStoreReadIter;
261    type RevIter: StateStoreReadIter;
262
263    /// Point gets a value from the state store.
264    /// The result is based on a snapshot corresponding to the given `epoch`.
265    /// Both full key and the value are returned.
266    fn get_keyed_row(
267        &self,
268        key: TableKey<Bytes>,
269        read_options: ReadOptions,
270    ) -> impl StorageFuture<'_, Option<StateStoreKeyedRow>>;
271
272    /// Point gets a value from the state store.
273    /// The result is based on a snapshot corresponding to the given `epoch`.
274    /// Only the value is returned.
275    fn get(
276        &self,
277        key: TableKey<Bytes>,
278        read_options: ReadOptions,
279    ) -> impl StorageFuture<'_, Option<Bytes>> {
280        self.get_keyed_row(key, read_options)
281            .map_ok(|v| v.map(|(_, v)| v))
282    }
283
284    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
285    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
286    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
287    /// in `key_range`) The returned iterator will iterate data based on a snapshot
288    /// corresponding to the given `epoch`.
289    fn iter(
290        &self,
291        key_range: TableKeyRange,
292        read_options: ReadOptions,
293    ) -> impl StorageFuture<'_, Self::Iter>;
294
295    fn rev_iter(
296        &self,
297        key_range: TableKeyRange,
298        read_options: ReadOptions,
299    ) -> impl StorageFuture<'_, Self::RevIter>;
300}
301
302#[derive(Clone)]
303pub struct TryWaitEpochOptions {
304    pub table_id: TableId,
305}
306
307impl TryWaitEpochOptions {
308    #[cfg(any(test, feature = "test"))]
309    pub fn for_test(table_id: TableId) -> Self {
310        Self { table_id }
311    }
312}
313
314impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
315    fn from(value: TracedTryWaitEpochOptions) -> Self {
316        Self {
317            table_id: value.table_id.into(),
318        }
319    }
320}
321
322impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
323    fn from(value: TryWaitEpochOptions) -> Self {
324        Self {
325            table_id: value.table_id.into(),
326        }
327    }
328}
329
330#[derive(Clone, Copy)]
331pub struct NewReadSnapshotOptions {
332    pub table_id: TableId,
333}
334
335pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
336    type Local: LocalStateStore;
337    type ReadSnapshot: StateStoreRead + Clone;
338
339    /// If epoch is `Committed`, we will wait until the epoch is committed and its data is ready to
340    /// read. If epoch is `Current`, we will only check if the data can be read with this epoch.
341    fn try_wait_epoch(
342        &self,
343        epoch: HummockReadEpoch,
344        options: TryWaitEpochOptions,
345    ) -> impl StorageFuture<'_, ()>;
346
347    /// Creates a [`MonitoredStateStore`] from this state store, with given `stats`.
348    fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
349        MonitoredStateStore::new(self, storage_metrics)
350    }
351
352    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
353
354    fn new_read_snapshot(
355        &self,
356        epoch: HummockReadEpoch,
357        options: NewReadSnapshotOptions,
358    ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
359}
360
361/// A state store that is dedicated for streaming operator, which only reads the uncommitted data
362/// written by itself. Each local state store is not `Clone`, and is owned by a streaming state
363/// table.
364pub trait LocalStateStore: StaticSendSync {
365    type FlushedSnapshotReader: StateStoreRead;
366    type Iter<'a>: StateStoreIter + 'a;
367    type RevIter<'a>: StateStoreIter + 'a;
368
369    /// Point gets a value from the state store.
370    /// The result is based on the latest written snapshot.
371    fn get(
372        &self,
373        key: TableKey<Bytes>,
374        read_options: ReadOptions,
375    ) -> impl StorageFuture<'_, Option<Bytes>>;
376
377    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
378    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
379    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
380    /// in `key_range`) The returned iterator will iterate data based on the latest written
381    /// snapshot.
382    fn iter(
383        &self,
384        key_range: TableKeyRange,
385        read_options: ReadOptions,
386    ) -> impl StorageFuture<'_, Self::Iter<'_>>;
387
388    fn rev_iter(
389        &self,
390        key_range: TableKeyRange,
391        read_options: ReadOptions,
392    ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
393
394    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
395
396    /// Get last persisted watermark for a given vnode.
397    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
398
399    /// Inserts a key-value entry associated with a given `epoch` into the state store.
400    fn insert(
401        &mut self,
402        key: TableKey<Bytes>,
403        new_val: Bytes,
404        old_val: Option<Bytes>,
405    ) -> StorageResult<()>;
406
407    /// Deletes a key-value entry from the state store. Only the key-value entry with epoch smaller
408    /// than the given `epoch` will be deleted.
409    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
410
411    fn flush(&mut self) -> impl StorageFuture<'_, usize>;
412
413    fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
414    fn epoch(&self) -> u64;
415
416    fn is_dirty(&self) -> bool;
417
418    /// Initializes the state store with given `epoch` pair.
419    /// Typically we will use `epoch.curr` as the initialized epoch,
420    /// Since state table will begin as empty.
421    /// In some cases like replicated state table, state table may not be empty initially,
422    /// as such we need to wait for `epoch.prev` checkpoint to complete,
423    /// hence this interface is made async.
424    fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
425
426    /// Updates the monotonically increasing write epoch to `new_epoch`.
427    /// All writes after this function is called will be tagged with `new_epoch`. In other words,
428    /// the previous write epoch is sealed.
429    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
430
431    // Updates the vnode bitmap corresponding to the local state store
432    // Returns the previous vnode bitmap
433    fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
434}
435
436/// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory
437/// footprint of the CN process because the prefetched blocks cannot be evicted.
438/// Since the streaming-read of object-storage may hung in some case, we still use sync short read
439/// for both batch-query and streaming process. So this configure is unused.
440#[derive(Default, Clone, Copy)]
441pub struct PrefetchOptions {
442    pub prefetch: bool,
443    pub for_large_query: bool,
444}
445
446impl PrefetchOptions {
447    pub fn prefetch_for_large_range_scan() -> Self {
448        Self {
449            prefetch: true,
450            for_large_query: true,
451        }
452    }
453
454    pub fn prefetch_for_small_range_scan() -> Self {
455        Self {
456            prefetch: true,
457            for_large_query: false,
458        }
459    }
460
461    pub fn new(prefetch: bool, for_large_query: bool) -> Self {
462        Self {
463            prefetch,
464            for_large_query,
465        }
466    }
467}
468
469impl From<TracedPrefetchOptions> for PrefetchOptions {
470    fn from(value: TracedPrefetchOptions) -> Self {
471        Self {
472            prefetch: value.prefetch,
473            for_large_query: value.for_large_query,
474        }
475    }
476}
477
478impl From<PrefetchOptions> for TracedPrefetchOptions {
479    fn from(value: PrefetchOptions) -> Self {
480        Self {
481            prefetch: value.prefetch,
482            for_large_query: value.for_large_query,
483        }
484    }
485}
486
487#[derive(Default, Clone)]
488pub struct ReadOptions {
489    /// A hint for prefix key to check bloom filter.
490    /// If the `prefix_hint` is not None, it should be included in
491    /// `key` or `key_range` in the read API.
492    pub prefix_hint: Option<Bytes>,
493    pub prefetch_options: PrefetchOptions,
494    pub cache_policy: CachePolicy,
495
496    pub retention_seconds: Option<u32>,
497    pub table_id: TableId,
498    /// Read from historical hummock version of meta snapshot backup.
499    /// It should only be used by `StorageTable` for batch query.
500    pub read_version_from_backup: bool,
501    pub read_committed: bool,
502}
503
504impl From<TracedReadOptions> for ReadOptions {
505    fn from(value: TracedReadOptions) -> Self {
506        Self {
507            prefix_hint: value.prefix_hint.map(|b| b.into()),
508            prefetch_options: value.prefetch_options.into(),
509            cache_policy: value.cache_policy.into(),
510            retention_seconds: value.retention_seconds,
511            table_id: value.table_id.into(),
512            read_version_from_backup: value.read_version_from_backup,
513            read_committed: value.read_committed,
514        }
515    }
516}
517
518impl From<ReadOptions> for TracedReadOptions {
519    fn from(value: ReadOptions) -> Self {
520        Self {
521            prefix_hint: value.prefix_hint.map(|b| b.into()),
522            prefetch_options: value.prefetch_options.into(),
523            cache_policy: value.cache_policy.into(),
524            retention_seconds: value.retention_seconds,
525            table_id: value.table_id.into(),
526            read_version_from_backup: value.read_version_from_backup,
527            read_committed: value.read_committed,
528        }
529    }
530}
531
532pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<&u32>) -> u64 {
533    let base_epoch = Epoch(base_epoch);
534    match retention_seconds {
535        Some(retention_seconds_u32) => {
536            base_epoch
537                .subtract_ms(*retention_seconds_u32 as u64 * 1000)
538                .0
539        }
540        None => 0,
541    }
542}
543
544#[derive(Default, Clone)]
545pub struct WriteOptions {
546    pub epoch: u64,
547    pub table_id: TableId,
548}
549
550impl From<TracedWriteOptions> for WriteOptions {
551    fn from(value: TracedWriteOptions) -> Self {
552        Self {
553            epoch: value.epoch,
554            table_id: value.table_id.into(),
555        }
556    }
557}
558
559pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
560
561pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
562    LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
563
564#[derive(Default, Clone)]
565pub enum OpConsistencyLevel {
566    #[default]
567    Inconsistent,
568    ConsistentOldValue {
569        check_old_value: Arc<dyn CheckOldValueEquality>,
570        /// whether should store the old value
571        is_log_store: bool,
572    },
573}
574
575impl Debug for OpConsistencyLevel {
576    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
577        match self {
578            OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
579            OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
580                .debug_struct("OpConsistencyLevel::ConsistentOldValue")
581                .field("is_log_store", is_log_store)
582                .finish(),
583        }
584    }
585}
586
587impl PartialEq<Self> for OpConsistencyLevel {
588    fn eq(&self, other: &Self) -> bool {
589        matches!(
590            (self, other),
591            (
592                OpConsistencyLevel::Inconsistent,
593                OpConsistencyLevel::Inconsistent
594            ) | (
595                OpConsistencyLevel::ConsistentOldValue {
596                    is_log_store: true,
597                    ..
598                },
599                OpConsistencyLevel::ConsistentOldValue {
600                    is_log_store: true,
601                    ..
602                },
603            ) | (
604                OpConsistencyLevel::ConsistentOldValue {
605                    is_log_store: false,
606                    ..
607                },
608                OpConsistencyLevel::ConsistentOldValue {
609                    is_log_store: false,
610                    ..
611                },
612            )
613        )
614    }
615}
616
617impl Eq for OpConsistencyLevel {}
618
619impl OpConsistencyLevel {
620    pub fn update(&mut self, new_level: &OpConsistencyLevel) {
621        assert_ne!(self, new_level);
622        *self = new_level.clone()
623    }
624}
625
626#[derive(Clone)]
627pub struct NewLocalOptions {
628    pub table_id: TableId,
629    /// Whether the operation is consistent. The term `consistent` requires the following:
630    ///
631    /// 1. A key cannot be inserted or deleted for more than once, i.e. inserting to an existing
632    ///    key or deleting an non-existing key is not allowed.
633    ///
634    /// 2. The old value passed from
635    ///    `update` and `delete` should match the original stored value.
636    pub op_consistency_level: OpConsistencyLevel,
637    pub table_option: TableOption,
638
639    /// Indicate if this is replicated. If it is, we should not
640    /// upload its `ReadVersions`.
641    pub is_replicated: bool,
642
643    /// The vnode bitmap for the local state store instance
644    pub vnodes: Arc<Bitmap>,
645}
646
647impl From<TracedNewLocalOptions> for NewLocalOptions {
648    fn from(value: TracedNewLocalOptions) -> Self {
649        Self {
650            table_id: value.table_id.into(),
651            op_consistency_level: match value.op_consistency_level {
652                TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
653                TracedOpConsistencyLevel::ConsistentOldValue => {
654                    OpConsistencyLevel::ConsistentOldValue {
655                        check_old_value: CHECK_BYTES_EQUAL.clone(),
656                        // TODO: for simplicity, set it to false
657                        is_log_store: false,
658                    }
659                }
660            },
661            table_option: value.table_option.into(),
662            is_replicated: value.is_replicated,
663            vnodes: Arc::new(value.vnodes.into()),
664        }
665    }
666}
667
668impl From<NewLocalOptions> for TracedNewLocalOptions {
669    fn from(value: NewLocalOptions) -> Self {
670        Self {
671            table_id: value.table_id.into(),
672            op_consistency_level: match value.op_consistency_level {
673                OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
674                OpConsistencyLevel::ConsistentOldValue { .. } => {
675                    TracedOpConsistencyLevel::ConsistentOldValue
676                }
677            },
678            table_option: value.table_option.into(),
679            is_replicated: value.is_replicated,
680            vnodes: value.vnodes.as_ref().clone().into(),
681        }
682    }
683}
684
685impl NewLocalOptions {
686    pub fn new(
687        table_id: TableId,
688        op_consistency_level: OpConsistencyLevel,
689        table_option: TableOption,
690        vnodes: Arc<Bitmap>,
691    ) -> Self {
692        NewLocalOptions {
693            table_id,
694            op_consistency_level,
695            table_option,
696            is_replicated: false,
697            vnodes,
698        }
699    }
700
701    pub fn new_replicated(
702        table_id: TableId,
703        op_consistency_level: OpConsistencyLevel,
704        table_option: TableOption,
705        vnodes: Arc<Bitmap>,
706    ) -> Self {
707        NewLocalOptions {
708            table_id,
709            op_consistency_level,
710            table_option,
711            is_replicated: true,
712            vnodes,
713        }
714    }
715
716    pub fn for_test(table_id: TableId) -> Self {
717        Self {
718            table_id,
719            op_consistency_level: OpConsistencyLevel::Inconsistent,
720            table_option: TableOption {
721                retention_seconds: None,
722            },
723            is_replicated: false,
724            vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
725        }
726    }
727}
728
729#[derive(Clone)]
730pub struct InitOptions {
731    pub epoch: EpochPair,
732}
733
734impl InitOptions {
735    pub fn new(epoch: EpochPair) -> Self {
736        Self { epoch }
737    }
738}
739
740impl From<InitOptions> for TracedInitOptions {
741    fn from(value: InitOptions) -> Self {
742        TracedInitOptions {
743            epoch: value.epoch.into(),
744        }
745    }
746}
747
748impl From<TracedInitOptions> for InitOptions {
749    fn from(value: TracedInitOptions) -> Self {
750        InitOptions {
751            epoch: value.epoch.into(),
752        }
753    }
754}
755
756#[derive(Clone, Debug)]
757pub struct SealCurrentEpochOptions {
758    pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
759    pub switch_op_consistency_level: Option<OpConsistencyLevel>,
760}
761
762impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
763    fn from(value: SealCurrentEpochOptions) -> Self {
764        TracedSealCurrentEpochOptions {
765            table_watermarks: value.table_watermarks.map(
766                |(direction, watermarks, watermark_type)| {
767                    (
768                        direction == WatermarkDirection::Ascending,
769                        watermarks
770                            .into_iter()
771                            .map(|watermark| {
772                                let pb_watermark = PbVnodeWatermark::from(watermark);
773                                Message::encode_to_vec(&pb_watermark)
774                            })
775                            .collect(),
776                        match watermark_type {
777                            WatermarkSerdeType::NonPkPrefix => true,
778                            WatermarkSerdeType::PkPrefix => false,
779                        },
780                    )
781                },
782            ),
783            switch_op_consistency_level: value
784                .switch_op_consistency_level
785                .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
786        }
787    }
788}
789
790impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
791    fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
792        SealCurrentEpochOptions {
793            table_watermarks: value.table_watermarks.map(
794                |(is_ascending, watermarks, is_non_pk_prefix)| {
795                    (
796                        if is_ascending {
797                            WatermarkDirection::Ascending
798                        } else {
799                            WatermarkDirection::Descending
800                        },
801                        watermarks
802                            .into_iter()
803                            .map(|serialized_watermark| {
804                                Message::decode(serialized_watermark.as_slice())
805                                    .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
806                                    .expect("should not failed")
807                            })
808                            .collect(),
809                        if is_non_pk_prefix {
810                            WatermarkSerdeType::NonPkPrefix
811                        } else {
812                            WatermarkSerdeType::PkPrefix
813                        },
814                    )
815                },
816            ),
817            switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
818                if enable {
819                    OpConsistencyLevel::ConsistentOldValue {
820                        check_old_value: CHECK_BYTES_EQUAL.clone(),
821                        is_log_store: false,
822                    }
823                } else {
824                    OpConsistencyLevel::Inconsistent
825                }
826            }),
827        }
828    }
829}
830
831impl SealCurrentEpochOptions {
832    pub fn for_test() -> Self {
833        Self {
834            table_watermarks: None,
835            switch_op_consistency_level: None,
836        }
837    }
838}