risingwave_storage/
store.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::{Op, VectorRef};
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::{Epoch, EpochPair, MAX_EPOCH};
30use risingwave_common::vector::distance::DistanceMeasurement;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
33use risingwave_hummock_sdk::table_watermark::{
34    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
35};
36use risingwave_hummock_trace::{
37    TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
38    TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
39};
40use risingwave_pb::hummock::{PbVnodeWatermark, PbWatermarkSerdeType};
41
42use crate::error::{StorageError, StorageResult};
43use crate::hummock::CachePolicy;
44use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
45pub(crate) use crate::vector::{OnNearestItem, Vector};
46
47mod auto_rebuild;
48pub use auto_rebuild::{AutoRebuildStateStoreReadIter, timeout_auto_rebuild};
49
50pub trait StaticSendSync = Send + Sync + 'static;
51
52pub trait IterItem: Send + 'static {
53    type ItemRef<'a>: Send + Copy + 'a;
54}
55
56impl IterItem for StateStoreKeyedRow {
57    type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
58}
59
60impl IterItem for StateStoreReadLogItem {
61    type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
62}
63
64pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
65    fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
66}
67
68pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
69    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
70}
71
72pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
73    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
74        + Send;
75
76    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
77        self,
78        f: F,
79    ) -> Self::ItemStream<O, F>;
80
81    fn fused(self) -> FusedStateStoreIter<Self, T> {
82        FusedStateStoreIter::new(self)
83    }
84}
85
86#[try_stream(ok = O, error = StorageError)]
87async fn into_stream_inner<
88    T: IterItem,
89    I: StateStoreIter<T>,
90    O: Send,
91    F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
92>(
93    iter: I,
94    f: F,
95) {
96    let mut iter = iter.fused();
97    while let Some(item) = iter.try_next().await? {
98        yield f(item)?;
99    }
100}
101
102pub struct FromStreamStateStoreIter<S> {
103    inner: S,
104    item_buffer: Option<StateStoreKeyedRow>,
105}
106
107impl<S> FromStreamStateStoreIter<S> {
108    pub fn new(inner: S) -> Self {
109        Self {
110            inner,
111            item_buffer: None,
112        }
113    }
114}
115
116impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
117    for FromStreamStateStoreIter<S>
118{
119    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
120        self.item_buffer = self.inner.try_next().await?;
121        Ok(self
122            .item_buffer
123            .as_ref()
124            .map(|(key, value)| (key.to_ref(), value.as_ref())))
125    }
126}
127
128pub struct FusedStateStoreIter<I, T> {
129    inner: I,
130    finished: bool,
131    _phantom: PhantomData<T>,
132}
133
134impl<I, T> FusedStateStoreIter<I, T> {
135    fn new(inner: I) -> Self {
136        Self {
137            inner,
138            finished: false,
139            _phantom: PhantomData,
140        }
141    }
142}
143
144impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
145    async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
146        assert!(!self.finished, "call try_next after finish");
147        let result = self.inner.try_next().await;
148        match &result {
149            Ok(Some(_)) => {}
150            Ok(None) | Err(_) => {
151                self.finished = true;
152            }
153        }
154        result
155    }
156}
157
158impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
159    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
160        impl Stream<Item = StorageResult<O>> + Send;
161
162    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
163        self,
164        f: F,
165    ) -> Self::ItemStream<O, F> {
166        into_stream_inner(self, f)
167    }
168}
169
170pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
171pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
172pub trait StateStoreReadIter = StateStoreIter + 'static;
173
174#[derive(Clone, Copy, Eq, PartialEq, Debug)]
175pub enum ChangeLogValue<T> {
176    Insert(T),
177    Update { new_value: T, old_value: T },
178    Delete(T),
179}
180
181impl<T> ChangeLogValue<T> {
182    pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
183        Ok(match self {
184            ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
185            ChangeLogValue::Update {
186                new_value,
187                old_value,
188            } => ChangeLogValue::Update {
189                new_value: f(new_value)?,
190                old_value: f(old_value)?,
191            },
192            ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
193        })
194    }
195
196    pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
197        std::iter::from_coroutine(
198            #[coroutine]
199            move || match self {
200                Self::Insert(row) => {
201                    yield (Op::Insert, row);
202                }
203                Self::Delete(row) => {
204                    yield (Op::Delete, row);
205                }
206                Self::Update {
207                    old_value,
208                    new_value,
209                } => {
210                    yield (Op::UpdateDelete, old_value);
211                    yield (Op::UpdateInsert, new_value);
212                }
213            },
214        )
215    }
216}
217
218impl<T: AsRef<[u8]>> ChangeLogValue<T> {
219    pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
220        match self {
221            ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
222            ChangeLogValue::Update {
223                new_value,
224                old_value,
225            } => ChangeLogValue::Update {
226                new_value: new_value.as_ref(),
227                old_value: old_value.as_ref(),
228            },
229            ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
230        }
231    }
232}
233
234pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
235pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
236
237#[derive(Clone)]
238pub struct NextEpochOptions {
239    pub table_id: TableId,
240}
241
242#[derive(Clone)]
243pub struct ReadLogOptions {
244    pub table_id: TableId,
245}
246
247pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
248pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
249
250pub trait StateStoreReadLog: StaticSendSync {
251    type ChangeLogIter: StateStoreReadChangeLogIter;
252
253    fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
254
255    fn iter_log(
256        &self,
257        epoch_range: (u64, u64),
258        key_range: TableKeyRange,
259        options: ReadLogOptions,
260    ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
261}
262
263pub trait KeyValueFn<'a, O> =
264    for<'kv> FnOnce(FullKey<&'kv [u8]>, &'kv [u8]) -> StorageResult<O> + Send + 'a;
265
266pub trait StateStoreGet: StaticSendSync {
267    fn on_key_value<'a, O: Send + 'a>(
268        &'a self,
269        key: TableKey<Bytes>,
270        read_options: ReadOptions,
271        on_key_value_fn: impl KeyValueFn<'a, O>,
272    ) -> impl StorageFuture<'a, Option<O>>;
273}
274
275pub trait StateStoreRead: StateStoreGet + StaticSendSync {
276    type Iter: StateStoreReadIter;
277    type RevIter: StateStoreReadIter;
278
279    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
280    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
281    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
282    /// in `key_range`) The returned iterator will iterate data based on a snapshot
283    /// corresponding to the given `epoch`.
284    fn iter(
285        &self,
286        key_range: TableKeyRange,
287        read_options: ReadOptions,
288    ) -> impl StorageFuture<'_, Self::Iter>;
289
290    fn rev_iter(
291        &self,
292        key_range: TableKeyRange,
293        read_options: ReadOptions,
294    ) -> impl StorageFuture<'_, Self::RevIter>;
295}
296
297#[derive(Clone)]
298pub struct TryWaitEpochOptions {
299    pub table_id: TableId,
300}
301
302impl TryWaitEpochOptions {
303    #[cfg(any(test, feature = "test"))]
304    pub fn for_test(table_id: TableId) -> Self {
305        Self { table_id }
306    }
307}
308
309impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
310    fn from(value: TracedTryWaitEpochOptions) -> Self {
311        Self {
312            table_id: value.table_id.into(),
313        }
314    }
315}
316
317impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
318    fn from(value: TryWaitEpochOptions) -> Self {
319        Self {
320            table_id: value.table_id.into(),
321        }
322    }
323}
324
325#[derive(Clone, Copy)]
326pub struct NewReadSnapshotOptions {
327    pub table_id: TableId,
328    pub table_option: TableOption,
329}
330
331#[derive(Clone)]
332pub struct NewVectorWriterOptions {
333    pub table_id: TableId,
334}
335
336pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
337    type Local: LocalStateStore;
338    type ReadSnapshot: StateStoreRead + StateStoreReadVector + Clone;
339    type VectorWriter: StateStoreWriteVector;
340
341    /// If epoch is `Committed`, we will wait until the epoch is committed and its data is ready to
342    /// read. If epoch is `Current`, we will only check if the data can be read with this epoch.
343    fn try_wait_epoch(
344        &self,
345        epoch: HummockReadEpoch,
346        options: TryWaitEpochOptions,
347    ) -> impl StorageFuture<'_, ()>;
348
349    /// Creates a [`MonitoredStateStore`] from this state store, with given `stats`.
350    fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
351        MonitoredStateStore::new(self, storage_metrics)
352    }
353
354    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
355
356    fn new_read_snapshot(
357        &self,
358        epoch: HummockReadEpoch,
359        options: NewReadSnapshotOptions,
360    ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
361
362    fn new_vector_writer(
363        &self,
364        options: NewVectorWriterOptions,
365    ) -> impl Future<Output = Self::VectorWriter> + Send + '_;
366}
367
368/// A state store that is dedicated for streaming operator, which only reads the uncommitted data
369/// written by itself. Each local state store is not `Clone`, and is owned by a streaming state
370/// table.
371pub trait LocalStateStore: StateStoreGet + StateStoreWriteEpochControl + StaticSendSync {
372    type FlushedSnapshotReader: StateStoreRead;
373    type Iter<'a>: StateStoreIter + 'a;
374    type RevIter<'a>: StateStoreIter + 'a;
375
376    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
377    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
378    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
379    /// in `key_range`) The returned iterator will iterate data based on the latest written
380    /// snapshot.
381    fn iter(
382        &self,
383        key_range: TableKeyRange,
384        read_options: ReadOptions,
385    ) -> impl StorageFuture<'_, Self::Iter<'_>>;
386
387    fn rev_iter(
388        &self,
389        key_range: TableKeyRange,
390        read_options: ReadOptions,
391    ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
392
393    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
394
395    /// Get last persisted watermark for a given vnode.
396    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
397
398    /// Inserts a key-value entry associated with a given `epoch` into the state store.
399    fn insert(
400        &mut self,
401        key: TableKey<Bytes>,
402        new_val: Bytes,
403        old_val: Option<Bytes>,
404    ) -> StorageResult<()>;
405
406    /// Deletes a key-value entry from the state store. Only the key-value entry with epoch smaller
407    /// than the given `epoch` will be deleted.
408    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
409
410    // Updates the vnode bitmap corresponding to the local state store
411    // Returns the previous vnode bitmap
412    fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
413}
414
415pub trait StateStoreWriteEpochControl: StaticSendSync {
416    fn flush(&mut self) -> impl StorageFuture<'_, usize>;
417
418    fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
419
420    /// Initializes the state store with given `epoch` pair.
421    /// Typically we will use `epoch.curr` as the initialized epoch,
422    /// Since state table will begin as empty.
423    /// In some cases like replicated state table, state table may not be empty initially,
424    /// as such we need to wait for `epoch.prev` checkpoint to complete,
425    /// hence this interface is made async.
426    fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
427
428    /// Updates the monotonically increasing write epoch to `new_epoch`.
429    /// All writes after this function is called will be tagged with `new_epoch`. In other words,
430    /// the previous write epoch is sealed.
431    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
432}
433
434pub trait StateStoreWriteVector: StateStoreWriteEpochControl + StaticSendSync {
435    fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
436}
437
438pub struct VectorNearestOptions {
439    pub top_n: usize,
440    pub measure: DistanceMeasurement,
441    pub hnsw_ef_search: usize,
442}
443
444pub trait OnNearestItemFn<'a, O> = OnNearestItem<O> + Send + Sync + 'a;
445
446pub trait StateStoreReadVector: StaticSendSync {
447    fn nearest<'a, O: Send + 'a>(
448        &'a self,
449        vec: VectorRef<'a>,
450        options: VectorNearestOptions,
451        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
452    ) -> impl StorageFuture<'a, Vec<O>>;
453}
454
455/// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory
456/// footprint of the CN process because the prefetched blocks cannot be evicted.
457/// Since the streaming-read of object-storage may hung in some case, we still use sync short read
458/// for both batch-query and streaming process. So this configure is unused.
459#[derive(Default, Clone, Copy)]
460pub struct PrefetchOptions {
461    pub prefetch: bool,
462    pub for_large_query: bool,
463}
464
465impl PrefetchOptions {
466    pub fn prefetch_for_large_range_scan() -> Self {
467        Self {
468            prefetch: true,
469            for_large_query: true,
470        }
471    }
472
473    pub fn prefetch_for_small_range_scan() -> Self {
474        Self {
475            prefetch: true,
476            for_large_query: false,
477        }
478    }
479
480    pub fn new(prefetch: bool, for_large_query: bool) -> Self {
481        Self {
482            prefetch,
483            for_large_query,
484        }
485    }
486}
487
488impl From<TracedPrefetchOptions> for PrefetchOptions {
489    fn from(value: TracedPrefetchOptions) -> Self {
490        Self {
491            prefetch: value.prefetch,
492            for_large_query: value.for_large_query,
493        }
494    }
495}
496
497impl From<PrefetchOptions> for TracedPrefetchOptions {
498    fn from(value: PrefetchOptions) -> Self {
499        Self {
500            prefetch: value.prefetch,
501            for_large_query: value.for_large_query,
502        }
503    }
504}
505
506#[derive(Default, Clone)]
507pub struct ReadOptions {
508    /// A hint for prefix key to check bloom filter.
509    /// If the `prefix_hint` is not None, it should be included in
510    /// `key` or `key_range` in the read API.
511    pub prefix_hint: Option<Bytes>,
512    pub prefetch_options: PrefetchOptions,
513    pub cache_policy: CachePolicy,
514}
515
516impl From<TracedReadOptions> for ReadOptions {
517    fn from(value: TracedReadOptions) -> Self {
518        Self {
519            prefix_hint: value.prefix_hint.map(|b| b.into()),
520            prefetch_options: value.prefetch_options.into(),
521            cache_policy: value.cache_policy.into(),
522        }
523    }
524}
525
526impl ReadOptions {
527    pub fn into_traced_read_options(
528        self,
529        table_id: TableId,
530        epoch: Option<HummockReadEpoch>,
531        table_option: TableOption,
532    ) -> TracedReadOptions {
533        let value = self;
534        let (read_version_from_backup, read_committed) = match epoch {
535            None | Some(HummockReadEpoch::NoWait(_)) => (false, false),
536            Some(HummockReadEpoch::Backup(_)) => (true, false),
537            Some(HummockReadEpoch::Committed(_))
538            | Some(HummockReadEpoch::BatchQueryCommitted(_, _))
539            | Some(HummockReadEpoch::TimeTravel(_)) => (false, true),
540        };
541        TracedReadOptions {
542            prefix_hint: value.prefix_hint.map(|b| b.into()),
543            prefetch_options: value.prefetch_options.into(),
544            cache_policy: value.cache_policy.into(),
545            retention_seconds: table_option.retention_seconds,
546            table_id: table_id.into(),
547            read_version_from_backup,
548            read_committed,
549        }
550    }
551}
552
553pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<u32>) -> u64 {
554    match retention_seconds {
555        Some(retention_seconds_u32) => {
556            if base_epoch == MAX_EPOCH {
557                panic!("generate min epoch for MAX_EPOCH");
558            }
559            Epoch(base_epoch)
560                .subtract_ms(retention_seconds_u32 as u64 * 1000)
561                .0
562        }
563        None => 0,
564    }
565}
566
567pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
568
569pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
570    LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
571
572#[derive(Default, Clone)]
573pub enum OpConsistencyLevel {
574    #[default]
575    Inconsistent,
576    ConsistentOldValue {
577        check_old_value: Arc<dyn CheckOldValueEquality>,
578        /// whether should store the old value
579        is_log_store: bool,
580    },
581}
582
583impl Debug for OpConsistencyLevel {
584    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
585        match self {
586            OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
587            OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
588                .debug_struct("OpConsistencyLevel::ConsistentOldValue")
589                .field("is_log_store", is_log_store)
590                .finish(),
591        }
592    }
593}
594
595impl PartialEq<Self> for OpConsistencyLevel {
596    fn eq(&self, other: &Self) -> bool {
597        matches!(
598            (self, other),
599            (
600                OpConsistencyLevel::Inconsistent,
601                OpConsistencyLevel::Inconsistent
602            ) | (
603                OpConsistencyLevel::ConsistentOldValue {
604                    is_log_store: true,
605                    ..
606                },
607                OpConsistencyLevel::ConsistentOldValue {
608                    is_log_store: true,
609                    ..
610                },
611            ) | (
612                OpConsistencyLevel::ConsistentOldValue {
613                    is_log_store: false,
614                    ..
615                },
616                OpConsistencyLevel::ConsistentOldValue {
617                    is_log_store: false,
618                    ..
619                },
620            )
621        )
622    }
623}
624
625impl Eq for OpConsistencyLevel {}
626
627impl OpConsistencyLevel {
628    pub fn update(&mut self, new_level: &OpConsistencyLevel) {
629        assert_ne!(self, new_level);
630        *self = new_level.clone()
631    }
632}
633
634#[derive(Clone)]
635pub struct NewLocalOptions {
636    pub table_id: TableId,
637    /// Whether the operation is consistent. The term `consistent` requires the following:
638    ///
639    /// 1. A key cannot be inserted or deleted for more than once, i.e. inserting to an existing
640    ///    key or deleting an non-existing key is not allowed.
641    ///
642    /// 2. The old value passed from
643    ///    `update` and `delete` should match the original stored value.
644    pub op_consistency_level: OpConsistencyLevel,
645    pub table_option: TableOption,
646
647    /// Indicate if this is replicated. If it is, we should not
648    /// upload its `ReadVersions`.
649    pub is_replicated: bool,
650
651    /// The vnode bitmap for the local state store instance
652    pub vnodes: Arc<Bitmap>,
653
654    pub upload_on_flush: bool,
655}
656
657impl From<TracedNewLocalOptions> for NewLocalOptions {
658    fn from(value: TracedNewLocalOptions) -> Self {
659        Self {
660            table_id: value.table_id.into(),
661            op_consistency_level: match value.op_consistency_level {
662                TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
663                TracedOpConsistencyLevel::ConsistentOldValue => {
664                    OpConsistencyLevel::ConsistentOldValue {
665                        check_old_value: CHECK_BYTES_EQUAL.clone(),
666                        // TODO: for simplicity, set it to false
667                        is_log_store: false,
668                    }
669                }
670            },
671            table_option: value.table_option.into(),
672            is_replicated: value.is_replicated,
673            vnodes: Arc::new(value.vnodes.into()),
674            upload_on_flush: value.upload_on_flush,
675        }
676    }
677}
678
679impl From<NewLocalOptions> for TracedNewLocalOptions {
680    fn from(value: NewLocalOptions) -> Self {
681        Self {
682            table_id: value.table_id.into(),
683            op_consistency_level: match value.op_consistency_level {
684                OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
685                OpConsistencyLevel::ConsistentOldValue { .. } => {
686                    TracedOpConsistencyLevel::ConsistentOldValue
687                }
688            },
689            table_option: value.table_option.into(),
690            is_replicated: value.is_replicated,
691            vnodes: value.vnodes.as_ref().clone().into(),
692            upload_on_flush: value.upload_on_flush,
693        }
694    }
695}
696
697impl NewLocalOptions {
698    pub fn new(
699        table_id: TableId,
700        op_consistency_level: OpConsistencyLevel,
701        table_option: TableOption,
702        vnodes: Arc<Bitmap>,
703        upload_on_flush: bool,
704    ) -> Self {
705        NewLocalOptions {
706            table_id,
707            op_consistency_level,
708            table_option,
709            is_replicated: false,
710            vnodes,
711            upload_on_flush,
712        }
713    }
714
715    pub fn new_replicated(
716        table_id: TableId,
717        op_consistency_level: OpConsistencyLevel,
718        table_option: TableOption,
719        vnodes: Arc<Bitmap>,
720    ) -> Self {
721        NewLocalOptions {
722            table_id,
723            op_consistency_level,
724            table_option,
725            is_replicated: true,
726            vnodes,
727            upload_on_flush: false,
728        }
729    }
730
731    pub fn for_test(table_id: TableId) -> Self {
732        Self {
733            table_id,
734            op_consistency_level: OpConsistencyLevel::Inconsistent,
735            table_option: TableOption {
736                retention_seconds: None,
737            },
738            is_replicated: false,
739            vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
740            upload_on_flush: true,
741        }
742    }
743}
744
745#[derive(Clone)]
746pub struct InitOptions {
747    pub epoch: EpochPair,
748}
749
750impl InitOptions {
751    pub fn new(epoch: EpochPair) -> Self {
752        Self { epoch }
753    }
754}
755
756impl From<InitOptions> for TracedInitOptions {
757    fn from(value: InitOptions) -> Self {
758        TracedInitOptions {
759            epoch: value.epoch.into(),
760        }
761    }
762}
763
764impl From<TracedInitOptions> for InitOptions {
765    fn from(value: TracedInitOptions) -> Self {
766        InitOptions {
767            epoch: value.epoch.into(),
768        }
769    }
770}
771
772#[derive(Clone, Debug)]
773pub struct SealCurrentEpochOptions {
774    pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
775    pub switch_op_consistency_level: Option<OpConsistencyLevel>,
776}
777
778impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
779    fn from(value: SealCurrentEpochOptions) -> Self {
780        TracedSealCurrentEpochOptions {
781            table_watermarks: value.table_watermarks.map(
782                |(direction, watermarks, watermark_type)| {
783                    (
784                        direction == WatermarkDirection::Ascending,
785                        watermarks
786                            .into_iter()
787                            .map(|watermark| {
788                                let pb_watermark = PbVnodeWatermark::from(watermark);
789                                Message::encode_to_vec(&pb_watermark)
790                            })
791                            .collect(),
792                        PbWatermarkSerdeType::from(watermark_type) as i32,
793                    )
794                },
795            ),
796            switch_op_consistency_level: value
797                .switch_op_consistency_level
798                .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
799        }
800    }
801}
802
803impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
804    fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
805        SealCurrentEpochOptions {
806            table_watermarks: value.table_watermarks.map(
807                |(is_ascending, watermarks, watermark_serde_type)| {
808                    (
809                        if is_ascending {
810                            WatermarkDirection::Ascending
811                        } else {
812                            WatermarkDirection::Descending
813                        },
814                        watermarks
815                            .into_iter()
816                            .map(|serialized_watermark| {
817                                Message::decode(serialized_watermark.as_slice())
818                                    .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
819                                    .expect("should not failed")
820                            })
821                            .collect(),
822                        match PbWatermarkSerdeType::try_from(watermark_serde_type).unwrap() {
823                            PbWatermarkSerdeType::TypeUnspecified => unreachable!(),
824                            PbWatermarkSerdeType::PkPrefix => WatermarkSerdeType::PkPrefix,
825                            PbWatermarkSerdeType::NonPkPrefix => WatermarkSerdeType::NonPkPrefix,
826                            PbWatermarkSerdeType::Value => WatermarkSerdeType::Value,
827                        },
828                    )
829                },
830            ),
831            switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
832                if enable {
833                    OpConsistencyLevel::ConsistentOldValue {
834                        check_old_value: CHECK_BYTES_EQUAL.clone(),
835                        is_log_store: false,
836                    }
837                } else {
838                    OpConsistencyLevel::Inconsistent
839                }
840            }),
841        }
842    }
843}
844
845impl SealCurrentEpochOptions {
846    pub fn for_test() -> Self {
847        Self {
848            table_watermarks: None,
849            switch_op_consistency_level: None,
850        }
851    }
852}