risingwave_storage/
store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_hummock_sdk::HummockReadEpoch;
31use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
32use risingwave_hummock_sdk::table_watermark::{
33    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
34};
35use risingwave_hummock_trace::{
36    TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
37    TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
38};
39use risingwave_pb::hummock::PbVnodeWatermark;
40
41use crate::error::{StorageError, StorageResult};
42use crate::hummock::CachePolicy;
43use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
44pub(crate) use crate::vector::{DistanceMeasurement, OnNearestItem, Vector};
45
46pub trait StaticSendSync = Send + Sync + 'static;
47
48pub trait IterItem: Send + 'static {
49    type ItemRef<'a>: Send + Copy + 'a;
50}
51
52impl IterItem for StateStoreKeyedRow {
53    type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
54}
55
56impl IterItem for StateStoreReadLogItem {
57    type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
58}
59
60pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
61    fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
62}
63
64pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
65    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
66}
67
68pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
69    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
70        + Send;
71
72    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
73        self,
74        f: F,
75    ) -> Self::ItemStream<O, F>;
76
77    fn fused(self) -> FusedStateStoreIter<Self, T> {
78        FusedStateStoreIter::new(self)
79    }
80}
81
82#[try_stream(ok = O, error = StorageError)]
83async fn into_stream_inner<
84    T: IterItem,
85    I: StateStoreIter<T>,
86    O: Send,
87    F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
88>(
89    iter: I,
90    f: F,
91) {
92    let mut iter = iter.fused();
93    while let Some(item) = iter.try_next().await? {
94        yield f(item)?;
95    }
96}
97
98pub struct FromStreamStateStoreIter<S> {
99    inner: S,
100    item_buffer: Option<StateStoreKeyedRow>,
101}
102
103impl<S> FromStreamStateStoreIter<S> {
104    pub fn new(inner: S) -> Self {
105        Self {
106            inner,
107            item_buffer: None,
108        }
109    }
110}
111
112impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
113    for FromStreamStateStoreIter<S>
114{
115    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
116        self.item_buffer = self.inner.try_next().await?;
117        Ok(self
118            .item_buffer
119            .as_ref()
120            .map(|(key, value)| (key.to_ref(), value.as_ref())))
121    }
122}
123
124pub struct FusedStateStoreIter<I, T> {
125    inner: I,
126    finished: bool,
127    _phantom: PhantomData<T>,
128}
129
130impl<I, T> FusedStateStoreIter<I, T> {
131    fn new(inner: I) -> Self {
132        Self {
133            inner,
134            finished: false,
135            _phantom: PhantomData,
136        }
137    }
138}
139
140impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
141    async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
142        assert!(!self.finished, "call try_next after finish");
143        let result = self.inner.try_next().await;
144        match &result {
145            Ok(Some(_)) => {}
146            Ok(None) | Err(_) => {
147                self.finished = true;
148            }
149        }
150        result
151    }
152}
153
154impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
155    type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
156        impl Stream<Item = StorageResult<O>> + Send;
157
158    fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
159        self,
160        f: F,
161    ) -> Self::ItemStream<O, F> {
162        into_stream_inner(self, f)
163    }
164}
165
166pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
167pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
168pub trait StateStoreReadIter = StateStoreIter + 'static;
169
170#[derive(Clone, Copy, Eq, PartialEq, Debug)]
171pub enum ChangeLogValue<T> {
172    Insert(T),
173    Update { new_value: T, old_value: T },
174    Delete(T),
175}
176
177impl<T> ChangeLogValue<T> {
178    pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
179        Ok(match self {
180            ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
181            ChangeLogValue::Update {
182                new_value,
183                old_value,
184            } => ChangeLogValue::Update {
185                new_value: f(new_value)?,
186                old_value: f(old_value)?,
187            },
188            ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
189        })
190    }
191
192    pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
193        std::iter::from_coroutine(
194            #[coroutine]
195            move || match self {
196                Self::Insert(row) => {
197                    yield (Op::Insert, row);
198                }
199                Self::Delete(row) => {
200                    yield (Op::Delete, row);
201                }
202                Self::Update {
203                    old_value,
204                    new_value,
205                } => {
206                    yield (Op::UpdateDelete, old_value);
207                    yield (Op::UpdateInsert, new_value);
208                }
209            },
210        )
211    }
212}
213
214impl<T: AsRef<[u8]>> ChangeLogValue<T> {
215    pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
216        match self {
217            ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
218            ChangeLogValue::Update {
219                new_value,
220                old_value,
221            } => ChangeLogValue::Update {
222                new_value: new_value.as_ref(),
223                old_value: old_value.as_ref(),
224            },
225            ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
226        }
227    }
228}
229
230pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
231pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
232
233#[derive(Clone)]
234pub struct NextEpochOptions {
235    pub table_id: TableId,
236}
237
238#[derive(Clone)]
239pub struct ReadLogOptions {
240    pub table_id: TableId,
241}
242
243pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
244pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
245
246pub trait StateStoreReadLog: StaticSendSync {
247    type ChangeLogIter: StateStoreReadChangeLogIter;
248
249    fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
250
251    fn iter_log(
252        &self,
253        epoch_range: (u64, u64),
254        key_range: TableKeyRange,
255        options: ReadLogOptions,
256    ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
257}
258
259pub trait KeyValueFn<O> =
260    for<'kv> FnOnce(FullKey<&'kv [u8]>, &'kv [u8]) -> StorageResult<O> + Send + 'static;
261
262pub trait StateStoreGet: StaticSendSync {
263    fn on_key_value<O: Send + 'static>(
264        &self,
265        key: TableKey<Bytes>,
266        read_options: ReadOptions,
267        on_key_value_fn: impl KeyValueFn<O>,
268    ) -> impl StorageFuture<'_, Option<O>>;
269}
270
271pub trait StateStoreRead: StateStoreGet + StaticSendSync {
272    type Iter: StateStoreReadIter;
273    type RevIter: StateStoreReadIter;
274
275    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
276    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
277    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
278    /// in `key_range`) The returned iterator will iterate data based on a snapshot
279    /// corresponding to the given `epoch`.
280    fn iter(
281        &self,
282        key_range: TableKeyRange,
283        read_options: ReadOptions,
284    ) -> impl StorageFuture<'_, Self::Iter>;
285
286    fn rev_iter(
287        &self,
288        key_range: TableKeyRange,
289        read_options: ReadOptions,
290    ) -> impl StorageFuture<'_, Self::RevIter>;
291}
292
293#[derive(Clone)]
294pub struct TryWaitEpochOptions {
295    pub table_id: TableId,
296}
297
298impl TryWaitEpochOptions {
299    #[cfg(any(test, feature = "test"))]
300    pub fn for_test(table_id: TableId) -> Self {
301        Self { table_id }
302    }
303}
304
305impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
306    fn from(value: TracedTryWaitEpochOptions) -> Self {
307        Self {
308            table_id: value.table_id.into(),
309        }
310    }
311}
312
313impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
314    fn from(value: TryWaitEpochOptions) -> Self {
315        Self {
316            table_id: value.table_id.into(),
317        }
318    }
319}
320
321#[derive(Clone, Copy)]
322pub struct NewReadSnapshotOptions {
323    pub table_id: TableId,
324}
325
326#[derive(Clone)]
327pub struct NewVectorWriterOptions {
328    pub table_id: TableId,
329}
330
331pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
332    type Local: LocalStateStore;
333    type ReadSnapshot: StateStoreRead + StateStoreReadVector + Clone;
334    type VectorWriter: StateStoreWriteVector;
335
336    /// If epoch is `Committed`, we will wait until the epoch is committed and its data is ready to
337    /// read. If epoch is `Current`, we will only check if the data can be read with this epoch.
338    fn try_wait_epoch(
339        &self,
340        epoch: HummockReadEpoch,
341        options: TryWaitEpochOptions,
342    ) -> impl StorageFuture<'_, ()>;
343
344    /// Creates a [`MonitoredStateStore`] from this state store, with given `stats`.
345    fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
346        MonitoredStateStore::new(self, storage_metrics)
347    }
348
349    fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
350
351    fn new_read_snapshot(
352        &self,
353        epoch: HummockReadEpoch,
354        options: NewReadSnapshotOptions,
355    ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
356
357    fn new_vector_writer(
358        &self,
359        options: NewVectorWriterOptions,
360    ) -> impl Future<Output = Self::VectorWriter> + Send + '_;
361}
362
363/// A state store that is dedicated for streaming operator, which only reads the uncommitted data
364/// written by itself. Each local state store is not `Clone`, and is owned by a streaming state
365/// table.
366pub trait LocalStateStore: StateStoreGet + StateStoreWriteEpochControl + StaticSendSync {
367    type FlushedSnapshotReader: StateStoreRead;
368    type Iter<'a>: StateStoreIter + 'a;
369    type RevIter<'a>: StateStoreIter + 'a;
370
371    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
372    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
373    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
374    /// in `key_range`) The returned iterator will iterate data based on the latest written
375    /// snapshot.
376    fn iter(
377        &self,
378        key_range: TableKeyRange,
379        read_options: ReadOptions,
380    ) -> impl StorageFuture<'_, Self::Iter<'_>>;
381
382    fn rev_iter(
383        &self,
384        key_range: TableKeyRange,
385        read_options: ReadOptions,
386    ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
387
388    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
389
390    /// Get last persisted watermark for a given vnode.
391    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
392
393    /// Inserts a key-value entry associated with a given `epoch` into the state store.
394    fn insert(
395        &mut self,
396        key: TableKey<Bytes>,
397        new_val: Bytes,
398        old_val: Option<Bytes>,
399    ) -> StorageResult<()>;
400
401    /// Deletes a key-value entry from the state store. Only the key-value entry with epoch smaller
402    /// than the given `epoch` will be deleted.
403    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
404
405    // Updates the vnode bitmap corresponding to the local state store
406    // Returns the previous vnode bitmap
407    fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
408}
409
410pub trait StateStoreWriteEpochControl: StaticSendSync {
411    fn flush(&mut self) -> impl StorageFuture<'_, usize>;
412
413    fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
414
415    /// Initializes the state store with given `epoch` pair.
416    /// Typically we will use `epoch.curr` as the initialized epoch,
417    /// Since state table will begin as empty.
418    /// In some cases like replicated state table, state table may not be empty initially,
419    /// as such we need to wait for `epoch.prev` checkpoint to complete,
420    /// hence this interface is made async.
421    fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
422
423    /// Updates the monotonically increasing write epoch to `new_epoch`.
424    /// All writes after this function is called will be tagged with `new_epoch`. In other words,
425    /// the previous write epoch is sealed.
426    fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
427}
428
429pub trait StateStoreWriteVector: StateStoreWriteEpochControl + StaticSendSync {
430    fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
431}
432
433pub struct VectorNearestOptions {
434    pub top_n: usize,
435    pub measure: DistanceMeasurement,
436}
437
438pub trait OnNearestItemFn<O> = OnNearestItem<O> + Send + 'static;
439
440pub trait StateStoreReadVector: StaticSendSync {
441    fn nearest<O: Send + 'static>(
442        &self,
443        vec: Vector,
444        options: VectorNearestOptions,
445        on_nearest_item_fn: impl OnNearestItemFn<O>,
446    ) -> impl StorageFuture<'_, Vec<O>>;
447}
448
449/// If `prefetch` is true, prefetch will be enabled. Prefetching may increase the memory
450/// footprint of the CN process because the prefetched blocks cannot be evicted.
451/// Since the streaming-read of object-storage may hung in some case, we still use sync short read
452/// for both batch-query and streaming process. So this configure is unused.
453#[derive(Default, Clone, Copy)]
454pub struct PrefetchOptions {
455    pub prefetch: bool,
456    pub for_large_query: bool,
457}
458
459impl PrefetchOptions {
460    pub fn prefetch_for_large_range_scan() -> Self {
461        Self {
462            prefetch: true,
463            for_large_query: true,
464        }
465    }
466
467    pub fn prefetch_for_small_range_scan() -> Self {
468        Self {
469            prefetch: true,
470            for_large_query: false,
471        }
472    }
473
474    pub fn new(prefetch: bool, for_large_query: bool) -> Self {
475        Self {
476            prefetch,
477            for_large_query,
478        }
479    }
480}
481
482impl From<TracedPrefetchOptions> for PrefetchOptions {
483    fn from(value: TracedPrefetchOptions) -> Self {
484        Self {
485            prefetch: value.prefetch,
486            for_large_query: value.for_large_query,
487        }
488    }
489}
490
491impl From<PrefetchOptions> for TracedPrefetchOptions {
492    fn from(value: PrefetchOptions) -> Self {
493        Self {
494            prefetch: value.prefetch,
495            for_large_query: value.for_large_query,
496        }
497    }
498}
499
500#[derive(Default, Clone)]
501pub struct ReadOptions {
502    /// A hint for prefix key to check bloom filter.
503    /// If the `prefix_hint` is not None, it should be included in
504    /// `key` or `key_range` in the read API.
505    pub prefix_hint: Option<Bytes>,
506    pub prefetch_options: PrefetchOptions,
507    pub cache_policy: CachePolicy,
508
509    pub retention_seconds: Option<u32>,
510}
511
512impl From<TracedReadOptions> for ReadOptions {
513    fn from(value: TracedReadOptions) -> Self {
514        Self {
515            prefix_hint: value.prefix_hint.map(|b| b.into()),
516            prefetch_options: value.prefetch_options.into(),
517            cache_policy: value.cache_policy.into(),
518            retention_seconds: value.retention_seconds,
519        }
520    }
521}
522
523impl ReadOptions {
524    pub fn into_traced_read_options(
525        self,
526        table_id: TableId,
527        epoch: Option<HummockReadEpoch>,
528    ) -> TracedReadOptions {
529        let value = self;
530        let (read_version_from_backup, read_committed) = match epoch {
531            None | Some(HummockReadEpoch::NoWait(_)) => (false, false),
532            Some(HummockReadEpoch::Backup(_)) => (true, false),
533            Some(HummockReadEpoch::Committed(_))
534            | Some(HummockReadEpoch::BatchQueryCommitted(_, _))
535            | Some(HummockReadEpoch::TimeTravel(_)) => (false, true),
536        };
537        TracedReadOptions {
538            prefix_hint: value.prefix_hint.map(|b| b.into()),
539            prefetch_options: value.prefetch_options.into(),
540            cache_policy: value.cache_policy.into(),
541            retention_seconds: value.retention_seconds,
542            table_id: table_id.into(),
543            read_version_from_backup,
544            read_committed,
545        }
546    }
547}
548
549pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<&u32>) -> u64 {
550    let base_epoch = Epoch(base_epoch);
551    match retention_seconds {
552        Some(retention_seconds_u32) => {
553            base_epoch
554                .subtract_ms(*retention_seconds_u32 as u64 * 1000)
555                .0
556        }
557        None => 0,
558    }
559}
560
561pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
562
563pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
564    LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
565
566#[derive(Default, Clone)]
567pub enum OpConsistencyLevel {
568    #[default]
569    Inconsistent,
570    ConsistentOldValue {
571        check_old_value: Arc<dyn CheckOldValueEquality>,
572        /// whether should store the old value
573        is_log_store: bool,
574    },
575}
576
577impl Debug for OpConsistencyLevel {
578    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
579        match self {
580            OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
581            OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
582                .debug_struct("OpConsistencyLevel::ConsistentOldValue")
583                .field("is_log_store", is_log_store)
584                .finish(),
585        }
586    }
587}
588
589impl PartialEq<Self> for OpConsistencyLevel {
590    fn eq(&self, other: &Self) -> bool {
591        matches!(
592            (self, other),
593            (
594                OpConsistencyLevel::Inconsistent,
595                OpConsistencyLevel::Inconsistent
596            ) | (
597                OpConsistencyLevel::ConsistentOldValue {
598                    is_log_store: true,
599                    ..
600                },
601                OpConsistencyLevel::ConsistentOldValue {
602                    is_log_store: true,
603                    ..
604                },
605            ) | (
606                OpConsistencyLevel::ConsistentOldValue {
607                    is_log_store: false,
608                    ..
609                },
610                OpConsistencyLevel::ConsistentOldValue {
611                    is_log_store: false,
612                    ..
613                },
614            )
615        )
616    }
617}
618
619impl Eq for OpConsistencyLevel {}
620
621impl OpConsistencyLevel {
622    pub fn update(&mut self, new_level: &OpConsistencyLevel) {
623        assert_ne!(self, new_level);
624        *self = new_level.clone()
625    }
626}
627
628#[derive(Clone)]
629pub struct NewLocalOptions {
630    pub table_id: TableId,
631    /// Whether the operation is consistent. The term `consistent` requires the following:
632    ///
633    /// 1. A key cannot be inserted or deleted for more than once, i.e. inserting to an existing
634    ///    key or deleting an non-existing key is not allowed.
635    ///
636    /// 2. The old value passed from
637    ///    `update` and `delete` should match the original stored value.
638    pub op_consistency_level: OpConsistencyLevel,
639    pub table_option: TableOption,
640
641    /// Indicate if this is replicated. If it is, we should not
642    /// upload its `ReadVersions`.
643    pub is_replicated: bool,
644
645    /// The vnode bitmap for the local state store instance
646    pub vnodes: Arc<Bitmap>,
647}
648
649impl From<TracedNewLocalOptions> for NewLocalOptions {
650    fn from(value: TracedNewLocalOptions) -> Self {
651        Self {
652            table_id: value.table_id.into(),
653            op_consistency_level: match value.op_consistency_level {
654                TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
655                TracedOpConsistencyLevel::ConsistentOldValue => {
656                    OpConsistencyLevel::ConsistentOldValue {
657                        check_old_value: CHECK_BYTES_EQUAL.clone(),
658                        // TODO: for simplicity, set it to false
659                        is_log_store: false,
660                    }
661                }
662            },
663            table_option: value.table_option.into(),
664            is_replicated: value.is_replicated,
665            vnodes: Arc::new(value.vnodes.into()),
666        }
667    }
668}
669
670impl From<NewLocalOptions> for TracedNewLocalOptions {
671    fn from(value: NewLocalOptions) -> Self {
672        Self {
673            table_id: value.table_id.into(),
674            op_consistency_level: match value.op_consistency_level {
675                OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
676                OpConsistencyLevel::ConsistentOldValue { .. } => {
677                    TracedOpConsistencyLevel::ConsistentOldValue
678                }
679            },
680            table_option: value.table_option.into(),
681            is_replicated: value.is_replicated,
682            vnodes: value.vnodes.as_ref().clone().into(),
683        }
684    }
685}
686
687impl NewLocalOptions {
688    pub fn new(
689        table_id: TableId,
690        op_consistency_level: OpConsistencyLevel,
691        table_option: TableOption,
692        vnodes: Arc<Bitmap>,
693    ) -> Self {
694        NewLocalOptions {
695            table_id,
696            op_consistency_level,
697            table_option,
698            is_replicated: false,
699            vnodes,
700        }
701    }
702
703    pub fn new_replicated(
704        table_id: TableId,
705        op_consistency_level: OpConsistencyLevel,
706        table_option: TableOption,
707        vnodes: Arc<Bitmap>,
708    ) -> Self {
709        NewLocalOptions {
710            table_id,
711            op_consistency_level,
712            table_option,
713            is_replicated: true,
714            vnodes,
715        }
716    }
717
718    pub fn for_test(table_id: TableId) -> Self {
719        Self {
720            table_id,
721            op_consistency_level: OpConsistencyLevel::Inconsistent,
722            table_option: TableOption {
723                retention_seconds: None,
724            },
725            is_replicated: false,
726            vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
727        }
728    }
729}
730
731#[derive(Clone)]
732pub struct InitOptions {
733    pub epoch: EpochPair,
734}
735
736impl InitOptions {
737    pub fn new(epoch: EpochPair) -> Self {
738        Self { epoch }
739    }
740}
741
742impl From<InitOptions> for TracedInitOptions {
743    fn from(value: InitOptions) -> Self {
744        TracedInitOptions {
745            epoch: value.epoch.into(),
746        }
747    }
748}
749
750impl From<TracedInitOptions> for InitOptions {
751    fn from(value: TracedInitOptions) -> Self {
752        InitOptions {
753            epoch: value.epoch.into(),
754        }
755    }
756}
757
758#[derive(Clone, Debug)]
759pub struct SealCurrentEpochOptions {
760    pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
761    pub switch_op_consistency_level: Option<OpConsistencyLevel>,
762}
763
764impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
765    fn from(value: SealCurrentEpochOptions) -> Self {
766        TracedSealCurrentEpochOptions {
767            table_watermarks: value.table_watermarks.map(
768                |(direction, watermarks, watermark_type)| {
769                    (
770                        direction == WatermarkDirection::Ascending,
771                        watermarks
772                            .into_iter()
773                            .map(|watermark| {
774                                let pb_watermark = PbVnodeWatermark::from(watermark);
775                                Message::encode_to_vec(&pb_watermark)
776                            })
777                            .collect(),
778                        match watermark_type {
779                            WatermarkSerdeType::NonPkPrefix => true,
780                            WatermarkSerdeType::PkPrefix => false,
781                        },
782                    )
783                },
784            ),
785            switch_op_consistency_level: value
786                .switch_op_consistency_level
787                .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
788        }
789    }
790}
791
792impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
793    fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
794        SealCurrentEpochOptions {
795            table_watermarks: value.table_watermarks.map(
796                |(is_ascending, watermarks, is_non_pk_prefix)| {
797                    (
798                        if is_ascending {
799                            WatermarkDirection::Ascending
800                        } else {
801                            WatermarkDirection::Descending
802                        },
803                        watermarks
804                            .into_iter()
805                            .map(|serialized_watermark| {
806                                Message::decode(serialized_watermark.as_slice())
807                                    .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
808                                    .expect("should not failed")
809                            })
810                            .collect(),
811                        if is_non_pk_prefix {
812                            WatermarkSerdeType::NonPkPrefix
813                        } else {
814                            WatermarkSerdeType::PkPrefix
815                        },
816                    )
817                },
818            ),
819            switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
820                if enable {
821                    OpConsistencyLevel::ConsistentOldValue {
822                        check_old_value: CHECK_BYTES_EQUAL.clone(),
823                        is_log_store: false,
824                    }
825                } else {
826                    OpConsistencyLevel::Inconsistent
827                }
828            }),
829        }
830    }
831}
832
833impl SealCurrentEpochOptions {
834    pub fn for_test() -> Self {
835        Self {
836            table_watermarks: None,
837            switch_op_consistency_level: None,
838        }
839    }
840}