risingwave_storage/
store.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::{Op, VectorRef};
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::id::FragmentId;
30use risingwave_common::util::epoch::{Epoch, EpochPair, MAX_EPOCH};
31use risingwave_common::vector::distance::DistanceMeasurement;
32use risingwave_hummock_sdk::HummockReadEpoch;
33use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
34use risingwave_hummock_sdk::table_watermark::{
35    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
36};
37use risingwave_hummock_trace::{
38    TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
39    TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
40};
41use risingwave_pb::hummock::{PbVnodeWatermark, PbWatermarkSerdeType};
42
43use crate::error::{StorageError, StorageResult};
44use crate::hummock::CachePolicy;
45use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
46pub(crate) use crate::vector::{OnNearestItem, Vector};
47
48mod auto_rebuild;
49pub use auto_rebuild::{AutoRebuildStateStoreReadIter, timeout_auto_rebuild};
50
51pub trait StaticSendSync = Send + Sync + 'static;
52
53pub trait IterItem: Send + 'static {
54    type ItemRef<'a>: Send + Copy + 'a;
55}
56
57impl IterItem for StateStoreKeyedRow {
58    type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
59}
60
61impl IterItem for StateStoreReadLogItem {
62    type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
63}
64
65pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
66    fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
67}
68
69pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
70    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
71}
72
73pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
74    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
75        + Send;
76
77    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
78        self,
79        f: F,
80    ) -> Self::ItemStream<O, F>;
81
82    fn fused(self) -> FusedStateStoreIter<Self, T> {
83        FusedStateStoreIter::new(self)
84    }
85}
86
87#[try_stream(ok = O, error = StorageError)]
88async fn into_stream_inner<
89    T: IterItem,
90    I: StateStoreIter<T>,
91    O: Send,
92    F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
93>(
94    iter: I,
95    f: F,
96) {
97    let mut iter = iter.fused();
98    while let Some(item) = iter.try_next().await? {
99        yield f(item)?;
100    }
101}
102
103pub struct FromStreamStateStoreIter<S> {
104    inner: S,
105    item_buffer: Option<StateStoreKeyedRow>,
106}
107
108impl<S> FromStreamStateStoreIter<S> {
109    pub fn new(inner: S) -> Self {
110        Self {
111            inner,
112            item_buffer: None,
113        }
114    }
115}
116
117impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
118    for FromStreamStateStoreIter<S>
119{
120    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
121        self.item_buffer = self.inner.try_next().await?;
122        Ok(self
123            .item_buffer
124            .as_ref()
125            .map(|(key, value)| (key.to_ref(), value.as_ref())))
126    }
127}
128
129pub struct FusedStateStoreIter<I, T> {
130    inner: I,
131    finished: bool,
132    _phantom: PhantomData<T>,
133}
134
135impl<I, T> FusedStateStoreIter<I, T> {
136    fn new(inner: I) -> Self {
137        Self {
138            inner,
139            finished: false,
140            _phantom: PhantomData,
141        }
142    }
143}
144
145impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
146    async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
147        assert!(!self.finished, "call try_next after finish");
148        let result = self.inner.try_next().await;
149        match &result {
150            Ok(Some(_)) => {}
151            Ok(None) | Err(_) => {
152                self.finished = true;
153            }
154        }
155        result
156    }
157}
158
159impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
160    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
161        impl Stream<Item = StorageResult<O>> + Send;
162
163    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
164        self,
165        f: F,
166    ) -> Self::ItemStream<O, F> {
167        into_stream_inner(self, f)
168    }
169}
170
171pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
172pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
173pub trait StateStoreReadIter = StateStoreIter + 'static;
174
175#[derive(Clone, Copy, Eq, PartialEq, Debug)]
176pub enum ChangeLogValue<T> {
177    Insert(T),
178    Update { new_value: T, old_value: T },
179    Delete(T),
180}
181
182impl<T> ChangeLogValue<T> {
183    pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
184        Ok(match self {
185            ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
186            ChangeLogValue::Update {
187                new_value,
188                old_value,
189            } => ChangeLogValue::Update {
190                new_value: f(new_value)?,
191                old_value: f(old_value)?,
192            },
193            ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
194        })
195    }
196
197    pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
198        std::iter::from_coroutine(
199            #[coroutine]
200            move || match self {
201                Self::Insert(row) => {
202                    yield (Op::Insert, row);
203                }
204                Self::Delete(row) => {
205                    yield (Op::Delete, row);
206                }
207                Self::Update {
208                    old_value,
209                    new_value,
210                } => {
211                    yield (Op::UpdateDelete, old_value);
212                    yield (Op::UpdateInsert, new_value);
213                }
214            },
215        )
216    }
217}
218
219impl<T: AsRef<[u8]>> ChangeLogValue<T> {
220    pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
221        match self {
222            ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
223            ChangeLogValue::Update {
224                new_value,
225                old_value,
226            } => ChangeLogValue::Update {
227                new_value: new_value.as_ref(),
228                old_value: old_value.as_ref(),
229            },
230            ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
231        }
232    }
233}
234
235pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
236pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
237
238#[derive(Clone)]
239pub struct NextEpochOptions {
240    pub table_id: TableId,
241}
242
243#[derive(Clone)]
244pub struct ReadLogOptions {
245    pub table_id: TableId,
246}
247
248pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
249pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
250
251pub trait StateStoreReadLog: StaticSendSync {
252    type ChangeLogIter: StateStoreReadChangeLogIter;
253
254    fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
255
256    fn iter_log(
257        &self,
258        epoch_range: (u64, u64),
259        key_range: TableKeyRange,
260        options: ReadLogOptions,
261    ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
262}
263
264pub trait KeyValueFn<'a, O> =
265    for<'kv> FnOnce(FullKey<&'kv [u8]>, &'kv [u8]) -> StorageResult<O> + Send + 'a;
266
267pub trait StateStoreGet: StaticSendSync {
268    fn on_key_value<'a, O: Send + 'a>(
269        &'a self,
270        key: TableKey<Bytes>,
271        read_options: ReadOptions,
272        on_key_value_fn: impl KeyValueFn<'a, O>,
273    ) -> impl StorageFuture<'a, Option<O>>;
274}
275
276pub trait StateStoreRead: StateStoreGet + StaticSendSync {
277    type Iter: StateStoreReadIter;
278    type RevIter: StateStoreReadIter;
279
280    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
281    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
282    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
283    /// in `key_range`) The returned iterator will iterate data based on a snapshot
284    /// corresponding to the given `epoch`.
285    fn iter(
286        &self,
287        key_range: TableKeyRange,
288        read_options: ReadOptions,
289    ) -> impl StorageFuture<'_, Self::Iter>;
290
291    fn rev_iter(
292        &self,
293        key_range: TableKeyRange,
294        read_options: ReadOptions,
295    ) -> impl StorageFuture<'_, Self::RevIter>;
296}
297
298#[derive(Clone)]
299pub struct TryWaitEpochOptions {
300    pub table_id: TableId,
301}
302
303impl TryWaitEpochOptions {
304    #[cfg(any(test, feature = "test"))]
305    pub fn for_test(table_id: TableId) -> Self {
306        Self { table_id }
307    }
308}
309
310impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
311    fn from(value: TracedTryWaitEpochOptions) -> Self {
312        Self {
313            table_id: value.table_id.into(),
314        }
315    }
316}
317
318impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
319    fn from(value: TryWaitEpochOptions) -> Self {
320        Self {
321            table_id: value.table_id.into(),
322        }
323    }
324}
325
326#[derive(Clone, Copy)]
327pub struct NewReadSnapshotOptions {
328    pub table_id: TableId,
329    pub table_option: TableOption,
330}
331
332#[derive(Clone)]
333pub struct NewVectorWriterOptions {
334    pub table_id: TableId,
335}
336
337pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
338    type Local: LocalStateStore;
339    type ReadSnapshot: StateStoreRead + StateStoreReadVector + Clone;
340    type VectorWriter: StateStoreWriteVector;
341
342    /// If epoch is `Committed`, we will wait until the epoch is committed and its data is ready to
343    /// read. If epoch is `Current`, we will only check if the data can be read with this epoch.
344    fn try_wait_epoch(
345        &self,
346        epoch: HummockReadEpoch,
347        options: TryWaitEpochOptions,
348    ) -> impl StorageFuture<'_, ()>;
349
350    /// Creates a [`MonitoredStateStore`] from this state store, with given `stats`.
351    fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
352        MonitoredStateStore::new(self, storage_metrics)
353    }
354
355    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
356
357    fn new_read_snapshot(
358        &self,
359        epoch: HummockReadEpoch,
360        options: NewReadSnapshotOptions,
361    ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
362
363    fn new_vector_writer(
364        &self,
365        options: NewVectorWriterOptions,
366    ) -> impl Future<Output = Self::VectorWriter> + Send + '_;
367}
368
369/// A state store that is dedicated for streaming operator, which only reads the uncommitted data
370/// written by itself. Each local state store is not `Clone`, and is owned by a streaming state
371/// table.
372pub trait LocalStateStore: StateStoreGet + StateStoreWriteEpochControl + StaticSendSync {
373    type FlushedSnapshotReader: StateStoreRead;
374    type Iter<'a>: StateStoreIter + 'a;
375    type RevIter<'a>: StateStoreIter + 'a;
376
377    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
378    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
379    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
380    /// in `key_range`) The returned iterator will iterate data based on the latest written
381    /// snapshot.
382    fn iter(
383        &self,
384        key_range: TableKeyRange,
385        read_options: ReadOptions,
386    ) -> impl StorageFuture<'_, Self::Iter<'_>>;
387
388    fn rev_iter(
389        &self,
390        key_range: TableKeyRange,
391        read_options: ReadOptions,
392    ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
393
394    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
395
396    /// Get last persisted watermark for a given vnode.
397    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
398
399    /// Inserts a key-value entry associated with a given `epoch` into the state store.
400    fn insert(
401        &mut self,
402        key: TableKey<Bytes>,
403        new_val: Bytes,
404        old_val: Option<Bytes>,
405    ) -> StorageResult<()>;
406
407    /// Deletes a key-value entry from the state store. Only the key-value entry with epoch smaller
408    /// than the given `epoch` will be deleted.
409    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
410
411    // Updates the vnode bitmap corresponding to the local state store
412    // Returns the previous vnode bitmap
413    fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
414}
415
416pub trait StateStoreWriteEpochControl: StaticSendSync {
417    fn flush(&mut self) -> impl StorageFuture<'_, usize>;
418
419    fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
420
421    /// Initializes the state store with given `epoch` pair.
422    /// Typically we will use `epoch.curr` as the initialized epoch,
423    /// Since state table will begin as empty.
424    /// In some cases like replicated state table, state table may not be empty initially,
425    /// as such we need to wait for `epoch.prev` checkpoint to complete,
426    /// hence this interface is made async.
427    fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
428
429    /// Updates the monotonically increasing write epoch to `new_epoch`.
430    /// All writes after this function is called will be tagged with `new_epoch`. In other words,
431    /// the previous write epoch is sealed.
432    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
433}
434
435pub trait StateStoreWriteVector: StateStoreWriteEpochControl + StaticSendSync {
436    fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
437}
438
439pub struct VectorNearestOptions {
440    pub top_n: usize,
441    pub measure: DistanceMeasurement,
442    pub hnsw_ef_search: usize,
443}
444
445pub trait OnNearestItemFn<'a, O> = OnNearestItem<O> + Send + Sync + 'a;
446
447pub trait StateStoreReadVector: StaticSendSync {
448    fn nearest<'a, O: Send + 'a>(
449        &'a self,
450        vec: VectorRef<'a>,
451        options: VectorNearestOptions,
452        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
453    ) -> impl StorageFuture<'a, Vec<O>>;
454}
455
456/// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory
457/// footprint of the CN process because the prefetched blocks cannot be evicted.
458/// Since the streaming-read of object-storage may hung in some case, we still use sync short read
459/// for both batch-query and streaming process. So this configure is unused.
460#[derive(Default, Clone, Copy)]
461pub struct PrefetchOptions {
462    pub prefetch: bool,
463    pub for_large_query: bool,
464}
465
466impl PrefetchOptions {
467    pub fn prefetch_for_large_range_scan() -> Self {
468        Self {
469            prefetch: true,
470            for_large_query: true,
471        }
472    }
473
474    pub fn prefetch_for_small_range_scan() -> Self {
475        Self {
476            prefetch: true,
477            for_large_query: false,
478        }
479    }
480
481    pub fn new(prefetch: bool, for_large_query: bool) -> Self {
482        Self {
483            prefetch,
484            for_large_query,
485        }
486    }
487}
488
489impl From<TracedPrefetchOptions> for PrefetchOptions {
490    fn from(value: TracedPrefetchOptions) -> Self {
491        Self {
492            prefetch: value.prefetch,
493            for_large_query: value.for_large_query,
494        }
495    }
496}
497
498impl From<PrefetchOptions> for TracedPrefetchOptions {
499    fn from(value: PrefetchOptions) -> Self {
500        Self {
501            prefetch: value.prefetch,
502            for_large_query: value.for_large_query,
503        }
504    }
505}
506
507#[derive(Default, Clone)]
508pub struct ReadOptions {
509    /// A hint for prefix key to check bloom filter.
510    /// If the `prefix_hint` is not None, it should be included in
511    /// `key` or `key_range` in the read API.
512    pub prefix_hint: Option<Bytes>,
513    pub prefetch_options: PrefetchOptions,
514    pub cache_policy: CachePolicy,
515}
516
517impl From<TracedReadOptions> for ReadOptions {
518    fn from(value: TracedReadOptions) -> Self {
519        Self {
520            prefix_hint: value.prefix_hint.map(|b| b.into()),
521            prefetch_options: value.prefetch_options.into(),
522            cache_policy: value.cache_policy.into(),
523        }
524    }
525}
526
527impl ReadOptions {
528    pub fn into_traced_read_options(
529        self,
530        table_id: TableId,
531        epoch: Option<HummockReadEpoch>,
532        table_option: TableOption,
533    ) -> TracedReadOptions {
534        let value = self;
535        let (read_version_from_backup, read_committed) = match epoch {
536            None | Some(HummockReadEpoch::NoWait(_)) => (false, false),
537            Some(HummockReadEpoch::Backup(_)) => (true, false),
538            Some(HummockReadEpoch::Committed(_))
539            | Some(HummockReadEpoch::BatchQueryCommitted(_, _))
540            | Some(HummockReadEpoch::TimeTravel(_)) => (false, true),
541        };
542        TracedReadOptions {
543            prefix_hint: value.prefix_hint.map(|b| b.into()),
544            prefetch_options: value.prefetch_options.into(),
545            cache_policy: value.cache_policy.into(),
546            retention_seconds: table_option.retention_seconds,
547            table_id: table_id.into(),
548            read_version_from_backup,
549            read_committed,
550        }
551    }
552}
553
554pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<u32>) -> u64 {
555    match retention_seconds {
556        Some(retention_seconds_u32) => {
557            if base_epoch == MAX_EPOCH {
558                panic!("generate min epoch for MAX_EPOCH");
559            }
560            Epoch(base_epoch)
561                .subtract_ms(retention_seconds_u32 as u64 * 1000)
562                .0
563        }
564        None => 0,
565    }
566}
567
568pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
569
570pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
571    LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
572
573#[derive(Default, Clone)]
574pub enum OpConsistencyLevel {
575    #[default]
576    Inconsistent,
577    ConsistentOldValue {
578        check_old_value: Arc<dyn CheckOldValueEquality>,
579        /// whether should store the old value
580        is_log_store: bool,
581    },
582}
583
584impl Debug for OpConsistencyLevel {
585    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
586        match self {
587            OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
588            OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
589                .debug_struct("OpConsistencyLevel::ConsistentOldValue")
590                .field("is_log_store", is_log_store)
591                .finish(),
592        }
593    }
594}
595
596impl PartialEq<Self> for OpConsistencyLevel {
597    fn eq(&self, other: &Self) -> bool {
598        matches!(
599            (self, other),
600            (
601                OpConsistencyLevel::Inconsistent,
602                OpConsistencyLevel::Inconsistent
603            ) | (
604                OpConsistencyLevel::ConsistentOldValue {
605                    is_log_store: true,
606                    ..
607                },
608                OpConsistencyLevel::ConsistentOldValue {
609                    is_log_store: true,
610                    ..
611                },
612            ) | (
613                OpConsistencyLevel::ConsistentOldValue {
614                    is_log_store: false,
615                    ..
616                },
617                OpConsistencyLevel::ConsistentOldValue {
618                    is_log_store: false,
619                    ..
620                },
621            )
622        )
623    }
624}
625
626impl Eq for OpConsistencyLevel {}
627
628impl OpConsistencyLevel {
629    pub fn update(&mut self, new_level: &OpConsistencyLevel) {
630        assert_ne!(self, new_level);
631        *self = new_level.clone()
632    }
633}
634
635#[derive(Clone)]
636pub struct NewLocalOptions {
637    pub table_id: TableId,
638    pub fragment_id: FragmentId,
639    /// Whether the operation is consistent. The term `consistent` requires the following:
640    ///
641    /// 1. A key cannot be inserted or deleted for more than once, i.e. inserting to an existing
642    ///    key or deleting an non-existing key is not allowed.
643    ///
644    /// 2. The old value passed from
645    ///    `update` and `delete` should match the original stored value.
646    pub op_consistency_level: OpConsistencyLevel,
647    pub table_option: TableOption,
648
649    /// Indicate if this is replicated. If it is, we should not
650    /// upload its `ReadVersions`.
651    pub is_replicated: bool,
652
653    /// The vnode bitmap for the local state store instance
654    pub vnodes: Arc<Bitmap>,
655
656    pub upload_on_flush: bool,
657}
658
659impl From<TracedNewLocalOptions> for NewLocalOptions {
660    fn from(value: TracedNewLocalOptions) -> Self {
661        Self {
662            table_id: value.table_id.into(),
663            fragment_id: value.fragment_id.into(),
664            op_consistency_level: match value.op_consistency_level {
665                TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
666                TracedOpConsistencyLevel::ConsistentOldValue => {
667                    OpConsistencyLevel::ConsistentOldValue {
668                        check_old_value: CHECK_BYTES_EQUAL.clone(),
669                        // TODO: for simplicity, set it to false
670                        is_log_store: false,
671                    }
672                }
673            },
674            table_option: value.table_option.into(),
675            is_replicated: value.is_replicated,
676            vnodes: Arc::new(value.vnodes.into()),
677            upload_on_flush: value.upload_on_flush,
678        }
679    }
680}
681
682impl From<NewLocalOptions> for TracedNewLocalOptions {
683    fn from(value: NewLocalOptions) -> Self {
684        Self {
685            table_id: value.table_id.into(),
686            fragment_id: value.fragment_id.into(),
687            op_consistency_level: match value.op_consistency_level {
688                OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
689                OpConsistencyLevel::ConsistentOldValue { .. } => {
690                    TracedOpConsistencyLevel::ConsistentOldValue
691                }
692            },
693            table_option: value.table_option.into(),
694            is_replicated: value.is_replicated,
695            vnodes: value.vnodes.as_ref().clone().into(),
696            upload_on_flush: value.upload_on_flush,
697        }
698    }
699}
700
701impl NewLocalOptions {
702    pub fn new(
703        table_id: TableId,
704        fragment_id: FragmentId,
705        op_consistency_level: OpConsistencyLevel,
706        table_option: TableOption,
707        vnodes: Arc<Bitmap>,
708        upload_on_flush: bool,
709    ) -> Self {
710        NewLocalOptions {
711            table_id,
712            fragment_id,
713            op_consistency_level,
714            table_option,
715            is_replicated: false,
716            vnodes,
717            upload_on_flush,
718        }
719    }
720
721    pub fn new_replicated(
722        table_id: TableId,
723        fragment_id: FragmentId,
724        op_consistency_level: OpConsistencyLevel,
725        table_option: TableOption,
726        vnodes: Arc<Bitmap>,
727    ) -> Self {
728        NewLocalOptions {
729            table_id,
730            fragment_id,
731            op_consistency_level,
732            table_option,
733            is_replicated: true,
734            vnodes,
735            upload_on_flush: false,
736        }
737    }
738
739    pub fn for_test(table_id: TableId) -> Self {
740        Self {
741            table_id,
742            fragment_id: FragmentId::default(),
743            op_consistency_level: OpConsistencyLevel::Inconsistent,
744            table_option: TableOption {
745                retention_seconds: None,
746            },
747            is_replicated: false,
748            vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
749            upload_on_flush: true,
750        }
751    }
752}
753
754#[derive(Clone)]
755pub struct InitOptions {
756    pub epoch: EpochPair,
757}
758
759impl InitOptions {
760    pub fn new(epoch: EpochPair) -> Self {
761        Self { epoch }
762    }
763}
764
765impl From<InitOptions> for TracedInitOptions {
766    fn from(value: InitOptions) -> Self {
767        TracedInitOptions {
768            epoch: value.epoch.into(),
769        }
770    }
771}
772
773impl From<TracedInitOptions> for InitOptions {
774    fn from(value: TracedInitOptions) -> Self {
775        InitOptions {
776            epoch: value.epoch.into(),
777        }
778    }
779}
780
781#[derive(Clone, Debug)]
782pub struct SealCurrentEpochOptions {
783    pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
784    pub switch_op_consistency_level: Option<OpConsistencyLevel>,
785}
786
787impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
788    fn from(value: SealCurrentEpochOptions) -> Self {
789        TracedSealCurrentEpochOptions {
790            table_watermarks: value.table_watermarks.map(
791                |(direction, watermarks, watermark_type)| {
792                    (
793                        direction == WatermarkDirection::Ascending,
794                        watermarks
795                            .into_iter()
796                            .map(|watermark| {
797                                let pb_watermark = PbVnodeWatermark::from(watermark);
798                                Message::encode_to_vec(&pb_watermark)
799                            })
800                            .collect(),
801                        PbWatermarkSerdeType::from(watermark_type) as i32,
802                    )
803                },
804            ),
805            switch_op_consistency_level: value
806                .switch_op_consistency_level
807                .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
808        }
809    }
810}
811
812impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
813    fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
814        SealCurrentEpochOptions {
815            table_watermarks: value.table_watermarks.map(
816                |(is_ascending, watermarks, watermark_serde_type)| {
817                    (
818                        if is_ascending {
819                            WatermarkDirection::Ascending
820                        } else {
821                            WatermarkDirection::Descending
822                        },
823                        watermarks
824                            .into_iter()
825                            .map(|serialized_watermark| {
826                                Message::decode(serialized_watermark.as_slice())
827                                    .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
828                                    .expect("should not failed")
829                            })
830                            .collect(),
831                        match PbWatermarkSerdeType::try_from(watermark_serde_type).unwrap() {
832                            PbWatermarkSerdeType::TypeUnspecified => unreachable!(),
833                            PbWatermarkSerdeType::PkPrefix => WatermarkSerdeType::PkPrefix,
834                            PbWatermarkSerdeType::NonPkPrefix => WatermarkSerdeType::NonPkPrefix,
835                            PbWatermarkSerdeType::Value => WatermarkSerdeType::Value,
836                        },
837                    )
838                },
839            ),
840            switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
841                if enable {
842                    OpConsistencyLevel::ConsistentOldValue {
843                        check_old_value: CHECK_BYTES_EQUAL.clone(),
844                        is_log_store: false,
845                    }
846                } else {
847                    OpConsistencyLevel::Inconsistent
848                }
849            }),
850        }
851    }
852}
853
854impl SealCurrentEpochOptions {
855    pub fn for_test() -> Self {
856        Self {
857            table_watermarks: None,
858            switch_op_consistency_level: None,
859        }
860    }
861}