1use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_hummock_sdk::HummockReadEpoch;
31use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
32use risingwave_hummock_sdk::table_watermark::{
33 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
34};
35use risingwave_hummock_trace::{
36 TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
37 TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
38};
39use risingwave_pb::hummock::PbVnodeWatermark;
40
41use crate::error::{StorageError, StorageResult};
42use crate::hummock::CachePolicy;
43use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
44pub(crate) use crate::vector::{DistanceMeasurement, OnNearestItem, Vector};
45
46pub trait StaticSendSync = Send + Sync + 'static;
47
48pub trait IterItem: Send + 'static {
49 type ItemRef<'a>: Send + Copy + 'a;
50}
51
52impl IterItem for StateStoreKeyedRow {
53 type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
54}
55
56impl IterItem for StateStoreReadLogItem {
57 type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
58}
59
60pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
61 fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
62}
63
64pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
65 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
66}
67
68pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
69 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
70 + Send;
71
72 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
73 self,
74 f: F,
75 ) -> Self::ItemStream<O, F>;
76
77 fn fused(self) -> FusedStateStoreIter<Self, T> {
78 FusedStateStoreIter::new(self)
79 }
80}
81
82#[try_stream(ok = O, error = StorageError)]
83async fn into_stream_inner<
84 T: IterItem,
85 I: StateStoreIter<T>,
86 O: Send,
87 F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
88>(
89 iter: I,
90 f: F,
91) {
92 let mut iter = iter.fused();
93 while let Some(item) = iter.try_next().await? {
94 yield f(item)?;
95 }
96}
97
98pub struct FromStreamStateStoreIter<S> {
99 inner: S,
100 item_buffer: Option<StateStoreKeyedRow>,
101}
102
103impl<S> FromStreamStateStoreIter<S> {
104 pub fn new(inner: S) -> Self {
105 Self {
106 inner,
107 item_buffer: None,
108 }
109 }
110}
111
112impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
113 for FromStreamStateStoreIter<S>
114{
115 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
116 self.item_buffer = self.inner.try_next().await?;
117 Ok(self
118 .item_buffer
119 .as_ref()
120 .map(|(key, value)| (key.to_ref(), value.as_ref())))
121 }
122}
123
124pub struct FusedStateStoreIter<I, T> {
125 inner: I,
126 finished: bool,
127 _phantom: PhantomData<T>,
128}
129
130impl<I, T> FusedStateStoreIter<I, T> {
131 fn new(inner: I) -> Self {
132 Self {
133 inner,
134 finished: false,
135 _phantom: PhantomData,
136 }
137 }
138}
139
140impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
141 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
142 assert!(!self.finished, "call try_next after finish");
143 let result = self.inner.try_next().await;
144 match &result {
145 Ok(Some(_)) => {}
146 Ok(None) | Err(_) => {
147 self.finished = true;
148 }
149 }
150 result
151 }
152}
153
154impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
155 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
156 impl Stream<Item = StorageResult<O>> + Send;
157
158 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
159 self,
160 f: F,
161 ) -> Self::ItemStream<O, F> {
162 into_stream_inner(self, f)
163 }
164}
165
166pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
167pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
168pub trait StateStoreReadIter = StateStoreIter + 'static;
169
170#[derive(Clone, Copy, Eq, PartialEq, Debug)]
171pub enum ChangeLogValue<T> {
172 Insert(T),
173 Update { new_value: T, old_value: T },
174 Delete(T),
175}
176
177impl<T> ChangeLogValue<T> {
178 pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
179 Ok(match self {
180 ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
181 ChangeLogValue::Update {
182 new_value,
183 old_value,
184 } => ChangeLogValue::Update {
185 new_value: f(new_value)?,
186 old_value: f(old_value)?,
187 },
188 ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
189 })
190 }
191
192 pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
193 std::iter::from_coroutine(
194 #[coroutine]
195 move || match self {
196 Self::Insert(row) => {
197 yield (Op::Insert, row);
198 }
199 Self::Delete(row) => {
200 yield (Op::Delete, row);
201 }
202 Self::Update {
203 old_value,
204 new_value,
205 } => {
206 yield (Op::UpdateDelete, old_value);
207 yield (Op::UpdateInsert, new_value);
208 }
209 },
210 )
211 }
212}
213
214impl<T: AsRef<[u8]>> ChangeLogValue<T> {
215 pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
216 match self {
217 ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
218 ChangeLogValue::Update {
219 new_value,
220 old_value,
221 } => ChangeLogValue::Update {
222 new_value: new_value.as_ref(),
223 old_value: old_value.as_ref(),
224 },
225 ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
226 }
227 }
228}
229
230pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
231pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
232
233#[derive(Clone)]
234pub struct NextEpochOptions {
235 pub table_id: TableId,
236}
237
238#[derive(Clone)]
239pub struct ReadLogOptions {
240 pub table_id: TableId,
241}
242
243pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
244pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
245
246pub trait StateStoreReadLog: StaticSendSync {
247 type ChangeLogIter: StateStoreReadChangeLogIter;
248
249 fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
250
251 fn iter_log(
252 &self,
253 epoch_range: (u64, u64),
254 key_range: TableKeyRange,
255 options: ReadLogOptions,
256 ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
257}
258
259pub trait KeyValueFn<O> =
260 for<'kv> FnOnce(FullKey<&'kv [u8]>, &'kv [u8]) -> StorageResult<O> + Send + 'static;
261
262pub trait StateStoreGet: StaticSendSync {
263 fn on_key_value<O: Send + 'static>(
264 &self,
265 key: TableKey<Bytes>,
266 read_options: ReadOptions,
267 on_key_value_fn: impl KeyValueFn<O>,
268 ) -> impl StorageFuture<'_, Option<O>>;
269}
270
271pub trait StateStoreRead: StateStoreGet + StaticSendSync {
272 type Iter: StateStoreReadIter;
273 type RevIter: StateStoreReadIter;
274
275 fn iter(
281 &self,
282 key_range: TableKeyRange,
283 read_options: ReadOptions,
284 ) -> impl StorageFuture<'_, Self::Iter>;
285
286 fn rev_iter(
287 &self,
288 key_range: TableKeyRange,
289 read_options: ReadOptions,
290 ) -> impl StorageFuture<'_, Self::RevIter>;
291}
292
293#[derive(Clone)]
294pub struct TryWaitEpochOptions {
295 pub table_id: TableId,
296}
297
298impl TryWaitEpochOptions {
299 #[cfg(any(test, feature = "test"))]
300 pub fn for_test(table_id: TableId) -> Self {
301 Self { table_id }
302 }
303}
304
305impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
306 fn from(value: TracedTryWaitEpochOptions) -> Self {
307 Self {
308 table_id: value.table_id.into(),
309 }
310 }
311}
312
313impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
314 fn from(value: TryWaitEpochOptions) -> Self {
315 Self {
316 table_id: value.table_id.into(),
317 }
318 }
319}
320
321#[derive(Clone, Copy)]
322pub struct NewReadSnapshotOptions {
323 pub table_id: TableId,
324}
325
326#[derive(Clone)]
327pub struct NewVectorWriterOptions {
328 pub table_id: TableId,
329}
330
331pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
332 type Local: LocalStateStore;
333 type ReadSnapshot: StateStoreRead + StateStoreReadVector + Clone;
334 type VectorWriter: StateStoreWriteVector;
335
336 fn try_wait_epoch(
339 &self,
340 epoch: HummockReadEpoch,
341 options: TryWaitEpochOptions,
342 ) -> impl StorageFuture<'_, ()>;
343
344 fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
346 MonitoredStateStore::new(self, storage_metrics)
347 }
348
349 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
350
351 fn new_read_snapshot(
352 &self,
353 epoch: HummockReadEpoch,
354 options: NewReadSnapshotOptions,
355 ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
356
357 fn new_vector_writer(
358 &self,
359 options: NewVectorWriterOptions,
360 ) -> impl Future<Output = Self::VectorWriter> + Send + '_;
361}
362
363pub trait LocalStateStore: StateStoreGet + StateStoreWriteEpochControl + StaticSendSync {
367 type FlushedSnapshotReader: StateStoreRead;
368 type Iter<'a>: StateStoreIter + 'a;
369 type RevIter<'a>: StateStoreIter + 'a;
370
371 fn iter(
377 &self,
378 key_range: TableKeyRange,
379 read_options: ReadOptions,
380 ) -> impl StorageFuture<'_, Self::Iter<'_>>;
381
382 fn rev_iter(
383 &self,
384 key_range: TableKeyRange,
385 read_options: ReadOptions,
386 ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
387
388 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
389
390 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
392
393 fn insert(
395 &mut self,
396 key: TableKey<Bytes>,
397 new_val: Bytes,
398 old_val: Option<Bytes>,
399 ) -> StorageResult<()>;
400
401 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
404
405 fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
408}
409
410pub trait StateStoreWriteEpochControl: StaticSendSync {
411 fn flush(&mut self) -> impl StorageFuture<'_, usize>;
412
413 fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
414
415 fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
422
423 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
427}
428
429pub trait StateStoreWriteVector: StateStoreWriteEpochControl + StaticSendSync {
430 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
431}
432
433pub struct VectorNearestOptions {
434 pub top_n: usize,
435 pub measure: DistanceMeasurement,
436}
437
438pub trait OnNearestItemFn<O> = OnNearestItem<O> + Send + 'static;
439
440pub trait StateStoreReadVector: StaticSendSync {
441 fn nearest<O: Send + 'static>(
442 &self,
443 vec: Vector,
444 options: VectorNearestOptions,
445 on_nearest_item_fn: impl OnNearestItemFn<O>,
446 ) -> impl StorageFuture<'_, Vec<O>>;
447}
448
449#[derive(Default, Clone, Copy)]
454pub struct PrefetchOptions {
455 pub prefetch: bool,
456 pub for_large_query: bool,
457}
458
459impl PrefetchOptions {
460 pub fn prefetch_for_large_range_scan() -> Self {
461 Self {
462 prefetch: true,
463 for_large_query: true,
464 }
465 }
466
467 pub fn prefetch_for_small_range_scan() -> Self {
468 Self {
469 prefetch: true,
470 for_large_query: false,
471 }
472 }
473
474 pub fn new(prefetch: bool, for_large_query: bool) -> Self {
475 Self {
476 prefetch,
477 for_large_query,
478 }
479 }
480}
481
482impl From<TracedPrefetchOptions> for PrefetchOptions {
483 fn from(value: TracedPrefetchOptions) -> Self {
484 Self {
485 prefetch: value.prefetch,
486 for_large_query: value.for_large_query,
487 }
488 }
489}
490
491impl From<PrefetchOptions> for TracedPrefetchOptions {
492 fn from(value: PrefetchOptions) -> Self {
493 Self {
494 prefetch: value.prefetch,
495 for_large_query: value.for_large_query,
496 }
497 }
498}
499
500#[derive(Default, Clone)]
501pub struct ReadOptions {
502 pub prefix_hint: Option<Bytes>,
506 pub prefetch_options: PrefetchOptions,
507 pub cache_policy: CachePolicy,
508
509 pub retention_seconds: Option<u32>,
510}
511
512impl From<TracedReadOptions> for ReadOptions {
513 fn from(value: TracedReadOptions) -> Self {
514 Self {
515 prefix_hint: value.prefix_hint.map(|b| b.into()),
516 prefetch_options: value.prefetch_options.into(),
517 cache_policy: value.cache_policy.into(),
518 retention_seconds: value.retention_seconds,
519 }
520 }
521}
522
523impl ReadOptions {
524 pub fn into_traced_read_options(
525 self,
526 table_id: TableId,
527 epoch: Option<HummockReadEpoch>,
528 ) -> TracedReadOptions {
529 let value = self;
530 let (read_version_from_backup, read_committed) = match epoch {
531 None | Some(HummockReadEpoch::NoWait(_)) => (false, false),
532 Some(HummockReadEpoch::Backup(_)) => (true, false),
533 Some(HummockReadEpoch::Committed(_))
534 | Some(HummockReadEpoch::BatchQueryCommitted(_, _))
535 | Some(HummockReadEpoch::TimeTravel(_)) => (false, true),
536 };
537 TracedReadOptions {
538 prefix_hint: value.prefix_hint.map(|b| b.into()),
539 prefetch_options: value.prefetch_options.into(),
540 cache_policy: value.cache_policy.into(),
541 retention_seconds: value.retention_seconds,
542 table_id: table_id.into(),
543 read_version_from_backup,
544 read_committed,
545 }
546 }
547}
548
549pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<&u32>) -> u64 {
550 let base_epoch = Epoch(base_epoch);
551 match retention_seconds {
552 Some(retention_seconds_u32) => {
553 base_epoch
554 .subtract_ms(*retention_seconds_u32 as u64 * 1000)
555 .0
556 }
557 None => 0,
558 }
559}
560
561pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
562
563pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
564 LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
565
566#[derive(Default, Clone)]
567pub enum OpConsistencyLevel {
568 #[default]
569 Inconsistent,
570 ConsistentOldValue {
571 check_old_value: Arc<dyn CheckOldValueEquality>,
572 is_log_store: bool,
574 },
575}
576
577impl Debug for OpConsistencyLevel {
578 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
579 match self {
580 OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
581 OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
582 .debug_struct("OpConsistencyLevel::ConsistentOldValue")
583 .field("is_log_store", is_log_store)
584 .finish(),
585 }
586 }
587}
588
589impl PartialEq<Self> for OpConsistencyLevel {
590 fn eq(&self, other: &Self) -> bool {
591 matches!(
592 (self, other),
593 (
594 OpConsistencyLevel::Inconsistent,
595 OpConsistencyLevel::Inconsistent
596 ) | (
597 OpConsistencyLevel::ConsistentOldValue {
598 is_log_store: true,
599 ..
600 },
601 OpConsistencyLevel::ConsistentOldValue {
602 is_log_store: true,
603 ..
604 },
605 ) | (
606 OpConsistencyLevel::ConsistentOldValue {
607 is_log_store: false,
608 ..
609 },
610 OpConsistencyLevel::ConsistentOldValue {
611 is_log_store: false,
612 ..
613 },
614 )
615 )
616 }
617}
618
619impl Eq for OpConsistencyLevel {}
620
621impl OpConsistencyLevel {
622 pub fn update(&mut self, new_level: &OpConsistencyLevel) {
623 assert_ne!(self, new_level);
624 *self = new_level.clone()
625 }
626}
627
628#[derive(Clone)]
629pub struct NewLocalOptions {
630 pub table_id: TableId,
631 pub op_consistency_level: OpConsistencyLevel,
639 pub table_option: TableOption,
640
641 pub is_replicated: bool,
644
645 pub vnodes: Arc<Bitmap>,
647}
648
649impl From<TracedNewLocalOptions> for NewLocalOptions {
650 fn from(value: TracedNewLocalOptions) -> Self {
651 Self {
652 table_id: value.table_id.into(),
653 op_consistency_level: match value.op_consistency_level {
654 TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
655 TracedOpConsistencyLevel::ConsistentOldValue => {
656 OpConsistencyLevel::ConsistentOldValue {
657 check_old_value: CHECK_BYTES_EQUAL.clone(),
658 is_log_store: false,
660 }
661 }
662 },
663 table_option: value.table_option.into(),
664 is_replicated: value.is_replicated,
665 vnodes: Arc::new(value.vnodes.into()),
666 }
667 }
668}
669
670impl From<NewLocalOptions> for TracedNewLocalOptions {
671 fn from(value: NewLocalOptions) -> Self {
672 Self {
673 table_id: value.table_id.into(),
674 op_consistency_level: match value.op_consistency_level {
675 OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
676 OpConsistencyLevel::ConsistentOldValue { .. } => {
677 TracedOpConsistencyLevel::ConsistentOldValue
678 }
679 },
680 table_option: value.table_option.into(),
681 is_replicated: value.is_replicated,
682 vnodes: value.vnodes.as_ref().clone().into(),
683 }
684 }
685}
686
687impl NewLocalOptions {
688 pub fn new(
689 table_id: TableId,
690 op_consistency_level: OpConsistencyLevel,
691 table_option: TableOption,
692 vnodes: Arc<Bitmap>,
693 ) -> Self {
694 NewLocalOptions {
695 table_id,
696 op_consistency_level,
697 table_option,
698 is_replicated: false,
699 vnodes,
700 }
701 }
702
703 pub fn new_replicated(
704 table_id: TableId,
705 op_consistency_level: OpConsistencyLevel,
706 table_option: TableOption,
707 vnodes: Arc<Bitmap>,
708 ) -> Self {
709 NewLocalOptions {
710 table_id,
711 op_consistency_level,
712 table_option,
713 is_replicated: true,
714 vnodes,
715 }
716 }
717
718 pub fn for_test(table_id: TableId) -> Self {
719 Self {
720 table_id,
721 op_consistency_level: OpConsistencyLevel::Inconsistent,
722 table_option: TableOption {
723 retention_seconds: None,
724 },
725 is_replicated: false,
726 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
727 }
728 }
729}
730
731#[derive(Clone)]
732pub struct InitOptions {
733 pub epoch: EpochPair,
734}
735
736impl InitOptions {
737 pub fn new(epoch: EpochPair) -> Self {
738 Self { epoch }
739 }
740}
741
742impl From<InitOptions> for TracedInitOptions {
743 fn from(value: InitOptions) -> Self {
744 TracedInitOptions {
745 epoch: value.epoch.into(),
746 }
747 }
748}
749
750impl From<TracedInitOptions> for InitOptions {
751 fn from(value: TracedInitOptions) -> Self {
752 InitOptions {
753 epoch: value.epoch.into(),
754 }
755 }
756}
757
758#[derive(Clone, Debug)]
759pub struct SealCurrentEpochOptions {
760 pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
761 pub switch_op_consistency_level: Option<OpConsistencyLevel>,
762}
763
764impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
765 fn from(value: SealCurrentEpochOptions) -> Self {
766 TracedSealCurrentEpochOptions {
767 table_watermarks: value.table_watermarks.map(
768 |(direction, watermarks, watermark_type)| {
769 (
770 direction == WatermarkDirection::Ascending,
771 watermarks
772 .into_iter()
773 .map(|watermark| {
774 let pb_watermark = PbVnodeWatermark::from(watermark);
775 Message::encode_to_vec(&pb_watermark)
776 })
777 .collect(),
778 match watermark_type {
779 WatermarkSerdeType::NonPkPrefix => true,
780 WatermarkSerdeType::PkPrefix => false,
781 },
782 )
783 },
784 ),
785 switch_op_consistency_level: value
786 .switch_op_consistency_level
787 .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
788 }
789 }
790}
791
792impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
793 fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
794 SealCurrentEpochOptions {
795 table_watermarks: value.table_watermarks.map(
796 |(is_ascending, watermarks, is_non_pk_prefix)| {
797 (
798 if is_ascending {
799 WatermarkDirection::Ascending
800 } else {
801 WatermarkDirection::Descending
802 },
803 watermarks
804 .into_iter()
805 .map(|serialized_watermark| {
806 Message::decode(serialized_watermark.as_slice())
807 .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
808 .expect("should not failed")
809 })
810 .collect(),
811 if is_non_pk_prefix {
812 WatermarkSerdeType::NonPkPrefix
813 } else {
814 WatermarkSerdeType::PkPrefix
815 },
816 )
817 },
818 ),
819 switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
820 if enable {
821 OpConsistencyLevel::ConsistentOldValue {
822 check_old_value: CHECK_BYTES_EQUAL.clone(),
823 is_log_store: false,
824 }
825 } else {
826 OpConsistencyLevel::Inconsistent
827 }
828 }),
829 }
830 }
831}
832
833impl SealCurrentEpochOptions {
834 pub fn for_test() -> Self {
835 Self {
836 table_watermarks: None,
837 switch_op_consistency_level: None,
838 }
839 }
840}