risingwave_storage/
store.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::{Op, VectorRef};
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::{Epoch, EpochPair, MAX_EPOCH};
30use risingwave_common::vector::distance::DistanceMeasurement;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
33use risingwave_hummock_sdk::table_watermark::{
34    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
35};
36use risingwave_hummock_trace::{
37    TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
38    TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
39};
40use risingwave_pb::hummock::{PbVnodeWatermark, PbWatermarkSerdeType};
41
42use crate::error::{StorageError, StorageResult};
43use crate::hummock::CachePolicy;
44use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
45pub(crate) use crate::vector::{OnNearestItem, Vector};
46
47pub trait StaticSendSync = Send + Sync + 'static;
48
49pub trait IterItem: Send + 'static {
50    type ItemRef<'a>: Send + Copy + 'a;
51}
52
53impl IterItem for StateStoreKeyedRow {
54    type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
55}
56
57impl IterItem for StateStoreReadLogItem {
58    type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
59}
60
61pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
62    fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
63}
64
65pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
66    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
67}
68
69pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
70    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
71        + Send;
72
73    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
74        self,
75        f: F,
76    ) -> Self::ItemStream<O, F>;
77
78    fn fused(self) -> FusedStateStoreIter<Self, T> {
79        FusedStateStoreIter::new(self)
80    }
81}
82
83#[try_stream(ok = O, error = StorageError)]
84async fn into_stream_inner<
85    T: IterItem,
86    I: StateStoreIter<T>,
87    O: Send,
88    F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
89>(
90    iter: I,
91    f: F,
92) {
93    let mut iter = iter.fused();
94    while let Some(item) = iter.try_next().await? {
95        yield f(item)?;
96    }
97}
98
99pub struct FromStreamStateStoreIter<S> {
100    inner: S,
101    item_buffer: Option<StateStoreKeyedRow>,
102}
103
104impl<S> FromStreamStateStoreIter<S> {
105    pub fn new(inner: S) -> Self {
106        Self {
107            inner,
108            item_buffer: None,
109        }
110    }
111}
112
113impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
114    for FromStreamStateStoreIter<S>
115{
116    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
117        self.item_buffer = self.inner.try_next().await?;
118        Ok(self
119            .item_buffer
120            .as_ref()
121            .map(|(key, value)| (key.to_ref(), value.as_ref())))
122    }
123}
124
125pub struct FusedStateStoreIter<I, T> {
126    inner: I,
127    finished: bool,
128    _phantom: PhantomData<T>,
129}
130
131impl<I, T> FusedStateStoreIter<I, T> {
132    fn new(inner: I) -> Self {
133        Self {
134            inner,
135            finished: false,
136            _phantom: PhantomData,
137        }
138    }
139}
140
141impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
142    async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
143        assert!(!self.finished, "call try_next after finish");
144        let result = self.inner.try_next().await;
145        match &result {
146            Ok(Some(_)) => {}
147            Ok(None) | Err(_) => {
148                self.finished = true;
149            }
150        }
151        result
152    }
153}
154
155impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
156    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
157        impl Stream<Item = StorageResult<O>> + Send;
158
159    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
160        self,
161        f: F,
162    ) -> Self::ItemStream<O, F> {
163        into_stream_inner(self, f)
164    }
165}
166
167pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
168pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
169pub trait StateStoreReadIter = StateStoreIter + 'static;
170
171#[derive(Clone, Copy, Eq, PartialEq, Debug)]
172pub enum ChangeLogValue<T> {
173    Insert(T),
174    Update { new_value: T, old_value: T },
175    Delete(T),
176}
177
178impl<T> ChangeLogValue<T> {
179    pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
180        Ok(match self {
181            ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
182            ChangeLogValue::Update {
183                new_value,
184                old_value,
185            } => ChangeLogValue::Update {
186                new_value: f(new_value)?,
187                old_value: f(old_value)?,
188            },
189            ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
190        })
191    }
192
193    pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
194        std::iter::from_coroutine(
195            #[coroutine]
196            move || match self {
197                Self::Insert(row) => {
198                    yield (Op::Insert, row);
199                }
200                Self::Delete(row) => {
201                    yield (Op::Delete, row);
202                }
203                Self::Update {
204                    old_value,
205                    new_value,
206                } => {
207                    yield (Op::UpdateDelete, old_value);
208                    yield (Op::UpdateInsert, new_value);
209                }
210            },
211        )
212    }
213}
214
215impl<T: AsRef<[u8]>> ChangeLogValue<T> {
216    pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
217        match self {
218            ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
219            ChangeLogValue::Update {
220                new_value,
221                old_value,
222            } => ChangeLogValue::Update {
223                new_value: new_value.as_ref(),
224                old_value: old_value.as_ref(),
225            },
226            ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
227        }
228    }
229}
230
231pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
232pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
233
234#[derive(Clone)]
235pub struct NextEpochOptions {
236    pub table_id: TableId,
237}
238
239#[derive(Clone)]
240pub struct ReadLogOptions {
241    pub table_id: TableId,
242}
243
244pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
245pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
246
247pub trait StateStoreReadLog: StaticSendSync {
248    type ChangeLogIter: StateStoreReadChangeLogIter;
249
250    fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
251
252    fn iter_log(
253        &self,
254        epoch_range: (u64, u64),
255        key_range: TableKeyRange,
256        options: ReadLogOptions,
257    ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
258}
259
260pub trait KeyValueFn<'a, O> =
261    for<'kv> FnOnce(FullKey<&'kv [u8]>, &'kv [u8]) -> StorageResult<O> + Send + 'a;
262
263pub trait StateStoreGet: StaticSendSync {
264    fn on_key_value<'a, O: Send + 'a>(
265        &'a self,
266        key: TableKey<Bytes>,
267        read_options: ReadOptions,
268        on_key_value_fn: impl KeyValueFn<'a, O>,
269    ) -> impl StorageFuture<'a, Option<O>>;
270}
271
272pub trait StateStoreRead: StateStoreGet + StaticSendSync {
273    type Iter: StateStoreReadIter;
274    type RevIter: StateStoreReadIter;
275
276    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
277    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
278    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
279    /// in `key_range`) The returned iterator will iterate data based on a snapshot
280    /// corresponding to the given `epoch`.
281    fn iter(
282        &self,
283        key_range: TableKeyRange,
284        read_options: ReadOptions,
285    ) -> impl StorageFuture<'_, Self::Iter>;
286
287    fn rev_iter(
288        &self,
289        key_range: TableKeyRange,
290        read_options: ReadOptions,
291    ) -> impl StorageFuture<'_, Self::RevIter>;
292}
293
294#[derive(Clone)]
295pub struct TryWaitEpochOptions {
296    pub table_id: TableId,
297}
298
299impl TryWaitEpochOptions {
300    #[cfg(any(test, feature = "test"))]
301    pub fn for_test(table_id: TableId) -> Self {
302        Self { table_id }
303    }
304}
305
306impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
307    fn from(value: TracedTryWaitEpochOptions) -> Self {
308        Self {
309            table_id: value.table_id.into(),
310        }
311    }
312}
313
314impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
315    fn from(value: TryWaitEpochOptions) -> Self {
316        Self {
317            table_id: value.table_id.into(),
318        }
319    }
320}
321
322#[derive(Clone, Copy)]
323pub struct NewReadSnapshotOptions {
324    pub table_id: TableId,
325    pub table_option: TableOption,
326}
327
328#[derive(Clone)]
329pub struct NewVectorWriterOptions {
330    pub table_id: TableId,
331}
332
333pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
334    type Local: LocalStateStore;
335    type ReadSnapshot: StateStoreRead + StateStoreReadVector + Clone;
336    type VectorWriter: StateStoreWriteVector;
337
338    /// If epoch is `Committed`, we will wait until the epoch is committed and its data is ready to
339    /// read. If epoch is `Current`, we will only check if the data can be read with this epoch.
340    fn try_wait_epoch(
341        &self,
342        epoch: HummockReadEpoch,
343        options: TryWaitEpochOptions,
344    ) -> impl StorageFuture<'_, ()>;
345
346    /// Creates a [`MonitoredStateStore`] from this state store, with given `stats`.
347    fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
348        MonitoredStateStore::new(self, storage_metrics)
349    }
350
351    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
352
353    fn new_read_snapshot(
354        &self,
355        epoch: HummockReadEpoch,
356        options: NewReadSnapshotOptions,
357    ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
358
359    fn new_vector_writer(
360        &self,
361        options: NewVectorWriterOptions,
362    ) -> impl Future<Output = Self::VectorWriter> + Send + '_;
363}
364
365/// A state store that is dedicated for streaming operator, which only reads the uncommitted data
366/// written by itself. Each local state store is not `Clone`, and is owned by a streaming state
367/// table.
368pub trait LocalStateStore: StateStoreGet + StateStoreWriteEpochControl + StaticSendSync {
369    type FlushedSnapshotReader: StateStoreRead;
370    type Iter<'a>: StateStoreIter + 'a;
371    type RevIter<'a>: StateStoreIter + 'a;
372
373    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
374    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
375    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
376    /// in `key_range`) The returned iterator will iterate data based on the latest written
377    /// snapshot.
378    fn iter(
379        &self,
380        key_range: TableKeyRange,
381        read_options: ReadOptions,
382    ) -> impl StorageFuture<'_, Self::Iter<'_>>;
383
384    fn rev_iter(
385        &self,
386        key_range: TableKeyRange,
387        read_options: ReadOptions,
388    ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
389
390    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
391
392    /// Get last persisted watermark for a given vnode.
393    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
394
395    /// Inserts a key-value entry associated with a given `epoch` into the state store.
396    fn insert(
397        &mut self,
398        key: TableKey<Bytes>,
399        new_val: Bytes,
400        old_val: Option<Bytes>,
401    ) -> StorageResult<()>;
402
403    /// Deletes a key-value entry from the state store. Only the key-value entry with epoch smaller
404    /// than the given `epoch` will be deleted.
405    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
406
407    // Updates the vnode bitmap corresponding to the local state store
408    // Returns the previous vnode bitmap
409    fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
410}
411
412pub trait StateStoreWriteEpochControl: StaticSendSync {
413    fn flush(&mut self) -> impl StorageFuture<'_, usize>;
414
415    fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
416
417    /// Initializes the state store with given `epoch` pair.
418    /// Typically we will use `epoch.curr` as the initialized epoch,
419    /// Since state table will begin as empty.
420    /// In some cases like replicated state table, state table may not be empty initially,
421    /// as such we need to wait for `epoch.prev` checkpoint to complete,
422    /// hence this interface is made async.
423    fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
424
425    /// Updates the monotonically increasing write epoch to `new_epoch`.
426    /// All writes after this function is called will be tagged with `new_epoch`. In other words,
427    /// the previous write epoch is sealed.
428    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
429}
430
431pub trait StateStoreWriteVector: StateStoreWriteEpochControl + StaticSendSync {
432    fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
433}
434
435pub struct VectorNearestOptions {
436    pub top_n: usize,
437    pub measure: DistanceMeasurement,
438    pub hnsw_ef_search: usize,
439}
440
441pub trait OnNearestItemFn<'a, O> = OnNearestItem<O> + Send + Sync + 'a;
442
443pub trait StateStoreReadVector: StaticSendSync {
444    fn nearest<'a, O: Send + 'a>(
445        &'a self,
446        vec: VectorRef<'a>,
447        options: VectorNearestOptions,
448        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
449    ) -> impl StorageFuture<'a, Vec<O>>;
450}
451
452/// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory
453/// footprint of the CN process because the prefetched blocks cannot be evicted.
454/// Since the streaming-read of object-storage may hung in some case, we still use sync short read
455/// for both batch-query and streaming process. So this configure is unused.
456#[derive(Default, Clone, Copy)]
457pub struct PrefetchOptions {
458    pub prefetch: bool,
459    pub for_large_query: bool,
460}
461
462impl PrefetchOptions {
463    pub fn prefetch_for_large_range_scan() -> Self {
464        Self {
465            prefetch: true,
466            for_large_query: true,
467        }
468    }
469
470    pub fn prefetch_for_small_range_scan() -> Self {
471        Self {
472            prefetch: true,
473            for_large_query: false,
474        }
475    }
476
477    pub fn new(prefetch: bool, for_large_query: bool) -> Self {
478        Self {
479            prefetch,
480            for_large_query,
481        }
482    }
483}
484
485impl From<TracedPrefetchOptions> for PrefetchOptions {
486    fn from(value: TracedPrefetchOptions) -> Self {
487        Self {
488            prefetch: value.prefetch,
489            for_large_query: value.for_large_query,
490        }
491    }
492}
493
494impl From<PrefetchOptions> for TracedPrefetchOptions {
495    fn from(value: PrefetchOptions) -> Self {
496        Self {
497            prefetch: value.prefetch,
498            for_large_query: value.for_large_query,
499        }
500    }
501}
502
503#[derive(Default, Clone)]
504pub struct ReadOptions {
505    /// A hint for prefix key to check bloom filter.
506    /// If the `prefix_hint` is not None, it should be included in
507    /// `key` or `key_range` in the read API.
508    pub prefix_hint: Option<Bytes>,
509    pub prefetch_options: PrefetchOptions,
510    pub cache_policy: CachePolicy,
511}
512
513impl From<TracedReadOptions> for ReadOptions {
514    fn from(value: TracedReadOptions) -> Self {
515        Self {
516            prefix_hint: value.prefix_hint.map(|b| b.into()),
517            prefetch_options: value.prefetch_options.into(),
518            cache_policy: value.cache_policy.into(),
519        }
520    }
521}
522
523impl ReadOptions {
524    pub fn into_traced_read_options(
525        self,
526        table_id: TableId,
527        epoch: Option<HummockReadEpoch>,
528        table_option: TableOption,
529    ) -> TracedReadOptions {
530        let value = self;
531        let (read_version_from_backup, read_committed) = match epoch {
532            None | Some(HummockReadEpoch::NoWait(_)) => (false, false),
533            Some(HummockReadEpoch::Backup(_)) => (true, false),
534            Some(HummockReadEpoch::Committed(_))
535            | Some(HummockReadEpoch::BatchQueryCommitted(_, _))
536            | Some(HummockReadEpoch::TimeTravel(_)) => (false, true),
537        };
538        TracedReadOptions {
539            prefix_hint: value.prefix_hint.map(|b| b.into()),
540            prefetch_options: value.prefetch_options.into(),
541            cache_policy: value.cache_policy.into(),
542            retention_seconds: table_option.retention_seconds,
543            table_id: table_id.into(),
544            read_version_from_backup,
545            read_committed,
546        }
547    }
548}
549
550pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<u32>) -> u64 {
551    match retention_seconds {
552        Some(retention_seconds_u32) => {
553            if base_epoch == MAX_EPOCH {
554                panic!("generate min epoch for MAX_EPOCH");
555            }
556            Epoch(base_epoch)
557                .subtract_ms(retention_seconds_u32 as u64 * 1000)
558                .0
559        }
560        None => 0,
561    }
562}
563
564pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
565
566pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
567    LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
568
569#[derive(Default, Clone)]
570pub enum OpConsistencyLevel {
571    #[default]
572    Inconsistent,
573    ConsistentOldValue {
574        check_old_value: Arc<dyn CheckOldValueEquality>,
575        /// whether should store the old value
576        is_log_store: bool,
577    },
578}
579
580impl Debug for OpConsistencyLevel {
581    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582        match self {
583            OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
584            OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
585                .debug_struct("OpConsistencyLevel::ConsistentOldValue")
586                .field("is_log_store", is_log_store)
587                .finish(),
588        }
589    }
590}
591
592impl PartialEq<Self> for OpConsistencyLevel {
593    fn eq(&self, other: &Self) -> bool {
594        matches!(
595            (self, other),
596            (
597                OpConsistencyLevel::Inconsistent,
598                OpConsistencyLevel::Inconsistent
599            ) | (
600                OpConsistencyLevel::ConsistentOldValue {
601                    is_log_store: true,
602                    ..
603                },
604                OpConsistencyLevel::ConsistentOldValue {
605                    is_log_store: true,
606                    ..
607                },
608            ) | (
609                OpConsistencyLevel::ConsistentOldValue {
610                    is_log_store: false,
611                    ..
612                },
613                OpConsistencyLevel::ConsistentOldValue {
614                    is_log_store: false,
615                    ..
616                },
617            )
618        )
619    }
620}
621
622impl Eq for OpConsistencyLevel {}
623
624impl OpConsistencyLevel {
625    pub fn update(&mut self, new_level: &OpConsistencyLevel) {
626        assert_ne!(self, new_level);
627        *self = new_level.clone()
628    }
629}
630
631#[derive(Clone)]
632pub struct NewLocalOptions {
633    pub table_id: TableId,
634    /// Whether the operation is consistent. The term `consistent` requires the following:
635    ///
636    /// 1. A key cannot be inserted or deleted for more than once, i.e. inserting to an existing
637    ///    key or deleting an non-existing key is not allowed.
638    ///
639    /// 2. The old value passed from
640    ///    `update` and `delete` should match the original stored value.
641    pub op_consistency_level: OpConsistencyLevel,
642    pub table_option: TableOption,
643
644    /// Indicate if this is replicated. If it is, we should not
645    /// upload its `ReadVersions`.
646    pub is_replicated: bool,
647
648    /// The vnode bitmap for the local state store instance
649    pub vnodes: Arc<Bitmap>,
650
651    pub upload_on_flush: bool,
652}
653
654impl From<TracedNewLocalOptions> for NewLocalOptions {
655    fn from(value: TracedNewLocalOptions) -> Self {
656        Self {
657            table_id: value.table_id.into(),
658            op_consistency_level: match value.op_consistency_level {
659                TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
660                TracedOpConsistencyLevel::ConsistentOldValue => {
661                    OpConsistencyLevel::ConsistentOldValue {
662                        check_old_value: CHECK_BYTES_EQUAL.clone(),
663                        // TODO: for simplicity, set it to false
664                        is_log_store: false,
665                    }
666                }
667            },
668            table_option: value.table_option.into(),
669            is_replicated: value.is_replicated,
670            vnodes: Arc::new(value.vnodes.into()),
671            upload_on_flush: value.upload_on_flush,
672        }
673    }
674}
675
676impl From<NewLocalOptions> for TracedNewLocalOptions {
677    fn from(value: NewLocalOptions) -> Self {
678        Self {
679            table_id: value.table_id.into(),
680            op_consistency_level: match value.op_consistency_level {
681                OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
682                OpConsistencyLevel::ConsistentOldValue { .. } => {
683                    TracedOpConsistencyLevel::ConsistentOldValue
684                }
685            },
686            table_option: value.table_option.into(),
687            is_replicated: value.is_replicated,
688            vnodes: value.vnodes.as_ref().clone().into(),
689            upload_on_flush: value.upload_on_flush,
690        }
691    }
692}
693
694impl NewLocalOptions {
695    pub fn new(
696        table_id: TableId,
697        op_consistency_level: OpConsistencyLevel,
698        table_option: TableOption,
699        vnodes: Arc<Bitmap>,
700        upload_on_flush: bool,
701    ) -> Self {
702        NewLocalOptions {
703            table_id,
704            op_consistency_level,
705            table_option,
706            is_replicated: false,
707            vnodes,
708            upload_on_flush,
709        }
710    }
711
712    pub fn new_replicated(
713        table_id: TableId,
714        op_consistency_level: OpConsistencyLevel,
715        table_option: TableOption,
716        vnodes: Arc<Bitmap>,
717    ) -> Self {
718        NewLocalOptions {
719            table_id,
720            op_consistency_level,
721            table_option,
722            is_replicated: true,
723            vnodes,
724            upload_on_flush: false,
725        }
726    }
727
728    pub fn for_test(table_id: TableId) -> Self {
729        Self {
730            table_id,
731            op_consistency_level: OpConsistencyLevel::Inconsistent,
732            table_option: TableOption {
733                retention_seconds: None,
734            },
735            is_replicated: false,
736            vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
737            upload_on_flush: true,
738        }
739    }
740}
741
742#[derive(Clone)]
743pub struct InitOptions {
744    pub epoch: EpochPair,
745}
746
747impl InitOptions {
748    pub fn new(epoch: EpochPair) -> Self {
749        Self { epoch }
750    }
751}
752
753impl From<InitOptions> for TracedInitOptions {
754    fn from(value: InitOptions) -> Self {
755        TracedInitOptions {
756            epoch: value.epoch.into(),
757        }
758    }
759}
760
761impl From<TracedInitOptions> for InitOptions {
762    fn from(value: TracedInitOptions) -> Self {
763        InitOptions {
764            epoch: value.epoch.into(),
765        }
766    }
767}
768
769#[derive(Clone, Debug)]
770pub struct SealCurrentEpochOptions {
771    pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
772    pub switch_op_consistency_level: Option<OpConsistencyLevel>,
773}
774
775impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
776    fn from(value: SealCurrentEpochOptions) -> Self {
777        TracedSealCurrentEpochOptions {
778            table_watermarks: value.table_watermarks.map(
779                |(direction, watermarks, watermark_type)| {
780                    (
781                        direction == WatermarkDirection::Ascending,
782                        watermarks
783                            .into_iter()
784                            .map(|watermark| {
785                                let pb_watermark = PbVnodeWatermark::from(watermark);
786                                Message::encode_to_vec(&pb_watermark)
787                            })
788                            .collect(),
789                        PbWatermarkSerdeType::from(watermark_type) as i32,
790                    )
791                },
792            ),
793            switch_op_consistency_level: value
794                .switch_op_consistency_level
795                .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
796        }
797    }
798}
799
800impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
801    fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
802        SealCurrentEpochOptions {
803            table_watermarks: value.table_watermarks.map(
804                |(is_ascending, watermarks, watermark_serde_type)| {
805                    (
806                        if is_ascending {
807                            WatermarkDirection::Ascending
808                        } else {
809                            WatermarkDirection::Descending
810                        },
811                        watermarks
812                            .into_iter()
813                            .map(|serialized_watermark| {
814                                Message::decode(serialized_watermark.as_slice())
815                                    .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
816                                    .expect("should not failed")
817                            })
818                            .collect(),
819                        match PbWatermarkSerdeType::try_from(watermark_serde_type).unwrap() {
820                            PbWatermarkSerdeType::TypeUnspecified => unreachable!(),
821                            PbWatermarkSerdeType::PkPrefix => WatermarkSerdeType::PkPrefix,
822                            PbWatermarkSerdeType::NonPkPrefix => WatermarkSerdeType::NonPkPrefix,
823                            PbWatermarkSerdeType::Value => WatermarkSerdeType::Value,
824                        },
825                    )
826                },
827            ),
828            switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
829                if enable {
830                    OpConsistencyLevel::ConsistentOldValue {
831                        check_old_value: CHECK_BYTES_EQUAL.clone(),
832                        is_log_store: false,
833                    }
834                } else {
835                    OpConsistencyLevel::Inconsistent
836                }
837            }),
838        }
839    }
840}
841
842impl SealCurrentEpochOptions {
843    pub fn for_test() -> Self {
844        Self {
845            table_watermarks: None,
846            switch_op_consistency_level: None,
847        }
848    }
849}