1use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryFutureExt, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_hummock_sdk::HummockReadEpoch;
31use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
32use risingwave_hummock_sdk::table_watermark::{
33 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
34};
35use risingwave_hummock_trace::{
36 TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
37 TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
38 TracedWriteOptions,
39};
40use risingwave_pb::hummock::PbVnodeWatermark;
41
42use crate::error::{StorageError, StorageResult};
43use crate::hummock::CachePolicy;
44use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
45
46pub trait StaticSendSync = Send + Sync + 'static;
47
48pub trait IterItem: Send + 'static {
49 type ItemRef<'a>: Send + Copy + 'a;
50}
51
52impl IterItem for StateStoreKeyedRow {
53 type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
54}
55
56impl IterItem for StateStoreReadLogItem {
57 type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
58}
59
60pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
61 fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
62}
63
64pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
65 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
66}
67
68pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
69 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
70 + Send;
71
72 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
73 self,
74 f: F,
75 ) -> Self::ItemStream<O, F>;
76
77 fn fused(self) -> FusedStateStoreIter<Self, T> {
78 FusedStateStoreIter::new(self)
79 }
80}
81
82#[try_stream(ok = O, error = StorageError)]
83async fn into_stream_inner<
84 T: IterItem,
85 I: StateStoreIter<T>,
86 O: Send,
87 F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
88>(
89 iter: I,
90 f: F,
91) {
92 let mut iter = iter.fused();
93 while let Some(item) = iter.try_next().await? {
94 yield f(item)?;
95 }
96}
97
98pub struct FromStreamStateStoreIter<S> {
99 inner: S,
100 item_buffer: Option<StateStoreKeyedRow>,
101}
102
103impl<S> FromStreamStateStoreIter<S> {
104 pub fn new(inner: S) -> Self {
105 Self {
106 inner,
107 item_buffer: None,
108 }
109 }
110}
111
112impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
113 for FromStreamStateStoreIter<S>
114{
115 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
116 self.item_buffer = self.inner.try_next().await?;
117 Ok(self
118 .item_buffer
119 .as_ref()
120 .map(|(key, value)| (key.to_ref(), value.as_ref())))
121 }
122}
123
124pub struct FusedStateStoreIter<I, T> {
125 inner: I,
126 finished: bool,
127 _phantom: PhantomData<T>,
128}
129
130impl<I, T> FusedStateStoreIter<I, T> {
131 fn new(inner: I) -> Self {
132 Self {
133 inner,
134 finished: false,
135 _phantom: PhantomData,
136 }
137 }
138}
139
140impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
141 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
142 assert!(!self.finished, "call try_next after finish");
143 let result = self.inner.try_next().await;
144 match &result {
145 Ok(Some(_)) => {}
146 Ok(None) | Err(_) => {
147 self.finished = true;
148 }
149 }
150 result
151 }
152}
153
154impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
155 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
156 impl Stream<Item = StorageResult<O>> + Send;
157
158 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
159 self,
160 f: F,
161 ) -> Self::ItemStream<O, F> {
162 into_stream_inner(self, f)
163 }
164}
165
166pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
167pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
168pub trait StateStoreReadIter = StateStoreIter + 'static;
169
170#[derive(Clone, Copy, Eq, PartialEq, Debug)]
171pub enum ChangeLogValue<T> {
172 Insert(T),
173 Update { new_value: T, old_value: T },
174 Delete(T),
175}
176
177impl<T> ChangeLogValue<T> {
178 pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
179 Ok(match self {
180 ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
181 ChangeLogValue::Update {
182 new_value,
183 old_value,
184 } => ChangeLogValue::Update {
185 new_value: f(new_value)?,
186 old_value: f(old_value)?,
187 },
188 ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
189 })
190 }
191
192 pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
193 std::iter::from_coroutine(
194 #[coroutine]
195 move || match self {
196 Self::Insert(row) => {
197 yield (Op::Insert, row);
198 }
199 Self::Delete(row) => {
200 yield (Op::Delete, row);
201 }
202 Self::Update {
203 old_value,
204 new_value,
205 } => {
206 yield (Op::UpdateDelete, old_value);
207 yield (Op::UpdateInsert, new_value);
208 }
209 },
210 )
211 }
212}
213
214impl<T: AsRef<[u8]>> ChangeLogValue<T> {
215 pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
216 match self {
217 ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
218 ChangeLogValue::Update {
219 new_value,
220 old_value,
221 } => ChangeLogValue::Update {
222 new_value: new_value.as_ref(),
223 old_value: old_value.as_ref(),
224 },
225 ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
226 }
227 }
228}
229
230pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
231pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
232
233#[derive(Clone)]
234pub struct NextEpochOptions {
235 pub table_id: TableId,
236}
237
238#[derive(Clone)]
239pub struct ReadLogOptions {
240 pub table_id: TableId,
241}
242
243pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
244pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
245
246pub trait StateStoreReadLog: StaticSendSync {
247 type ChangeLogIter: StateStoreReadChangeLogIter;
248
249 fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
250
251 fn iter_log(
252 &self,
253 epoch_range: (u64, u64),
254 key_range: TableKeyRange,
255 options: ReadLogOptions,
256 ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
257}
258
259pub trait StateStoreRead: StaticSendSync {
260 type Iter: StateStoreReadIter;
261 type RevIter: StateStoreReadIter;
262
263 fn get_keyed_row(
267 &self,
268 key: TableKey<Bytes>,
269 read_options: ReadOptions,
270 ) -> impl StorageFuture<'_, Option<StateStoreKeyedRow>>;
271
272 fn get(
276 &self,
277 key: TableKey<Bytes>,
278 read_options: ReadOptions,
279 ) -> impl StorageFuture<'_, Option<Bytes>> {
280 self.get_keyed_row(key, read_options)
281 .map_ok(|v| v.map(|(_, v)| v))
282 }
283
284 fn iter(
290 &self,
291 key_range: TableKeyRange,
292 read_options: ReadOptions,
293 ) -> impl StorageFuture<'_, Self::Iter>;
294
295 fn rev_iter(
296 &self,
297 key_range: TableKeyRange,
298 read_options: ReadOptions,
299 ) -> impl StorageFuture<'_, Self::RevIter>;
300}
301
302#[derive(Clone)]
303pub struct TryWaitEpochOptions {
304 pub table_id: TableId,
305}
306
307impl TryWaitEpochOptions {
308 #[cfg(any(test, feature = "test"))]
309 pub fn for_test(table_id: TableId) -> Self {
310 Self { table_id }
311 }
312}
313
314impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
315 fn from(value: TracedTryWaitEpochOptions) -> Self {
316 Self {
317 table_id: value.table_id.into(),
318 }
319 }
320}
321
322impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
323 fn from(value: TryWaitEpochOptions) -> Self {
324 Self {
325 table_id: value.table_id.into(),
326 }
327 }
328}
329
330#[derive(Clone, Copy)]
331pub struct NewReadSnapshotOptions {
332 pub table_id: TableId,
333}
334
335pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
336 type Local: LocalStateStore;
337 type ReadSnapshot: StateStoreRead + Clone;
338
339 fn try_wait_epoch(
342 &self,
343 epoch: HummockReadEpoch,
344 options: TryWaitEpochOptions,
345 ) -> impl StorageFuture<'_, ()>;
346
347 fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
349 MonitoredStateStore::new(self, storage_metrics)
350 }
351
352 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
353
354 fn new_read_snapshot(
355 &self,
356 epoch: HummockReadEpoch,
357 options: NewReadSnapshotOptions,
358 ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
359}
360
361pub trait LocalStateStore: StaticSendSync {
365 type FlushedSnapshotReader: StateStoreRead;
366 type Iter<'a>: StateStoreIter + 'a;
367 type RevIter<'a>: StateStoreIter + 'a;
368
369 fn get(
372 &self,
373 key: TableKey<Bytes>,
374 read_options: ReadOptions,
375 ) -> impl StorageFuture<'_, Option<Bytes>>;
376
377 fn iter(
383 &self,
384 key_range: TableKeyRange,
385 read_options: ReadOptions,
386 ) -> impl StorageFuture<'_, Self::Iter<'_>>;
387
388 fn rev_iter(
389 &self,
390 key_range: TableKeyRange,
391 read_options: ReadOptions,
392 ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
393
394 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
395
396 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
398
399 fn insert(
401 &mut self,
402 key: TableKey<Bytes>,
403 new_val: Bytes,
404 old_val: Option<Bytes>,
405 ) -> StorageResult<()>;
406
407 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
410
411 fn flush(&mut self) -> impl StorageFuture<'_, usize>;
412
413 fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
414 fn epoch(&self) -> u64;
415
416 fn is_dirty(&self) -> bool;
417
418 fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
425
426 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
430
431 fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
434}
435
436#[derive(Default, Clone, Copy)]
441pub struct PrefetchOptions {
442 pub prefetch: bool,
443 pub for_large_query: bool,
444}
445
446impl PrefetchOptions {
447 pub fn prefetch_for_large_range_scan() -> Self {
448 Self {
449 prefetch: true,
450 for_large_query: true,
451 }
452 }
453
454 pub fn prefetch_for_small_range_scan() -> Self {
455 Self {
456 prefetch: true,
457 for_large_query: false,
458 }
459 }
460
461 pub fn new(prefetch: bool, for_large_query: bool) -> Self {
462 Self {
463 prefetch,
464 for_large_query,
465 }
466 }
467}
468
469impl From<TracedPrefetchOptions> for PrefetchOptions {
470 fn from(value: TracedPrefetchOptions) -> Self {
471 Self {
472 prefetch: value.prefetch,
473 for_large_query: value.for_large_query,
474 }
475 }
476}
477
478impl From<PrefetchOptions> for TracedPrefetchOptions {
479 fn from(value: PrefetchOptions) -> Self {
480 Self {
481 prefetch: value.prefetch,
482 for_large_query: value.for_large_query,
483 }
484 }
485}
486
487#[derive(Default, Clone)]
488pub struct ReadOptions {
489 pub prefix_hint: Option<Bytes>,
493 pub prefetch_options: PrefetchOptions,
494 pub cache_policy: CachePolicy,
495
496 pub retention_seconds: Option<u32>,
497 pub table_id: TableId,
498 pub read_version_from_backup: bool,
501 pub read_committed: bool,
502}
503
504impl From<TracedReadOptions> for ReadOptions {
505 fn from(value: TracedReadOptions) -> Self {
506 Self {
507 prefix_hint: value.prefix_hint.map(|b| b.into()),
508 prefetch_options: value.prefetch_options.into(),
509 cache_policy: value.cache_policy.into(),
510 retention_seconds: value.retention_seconds,
511 table_id: value.table_id.into(),
512 read_version_from_backup: value.read_version_from_backup,
513 read_committed: value.read_committed,
514 }
515 }
516}
517
518impl From<ReadOptions> for TracedReadOptions {
519 fn from(value: ReadOptions) -> Self {
520 Self {
521 prefix_hint: value.prefix_hint.map(|b| b.into()),
522 prefetch_options: value.prefetch_options.into(),
523 cache_policy: value.cache_policy.into(),
524 retention_seconds: value.retention_seconds,
525 table_id: value.table_id.into(),
526 read_version_from_backup: value.read_version_from_backup,
527 read_committed: value.read_committed,
528 }
529 }
530}
531
532pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<&u32>) -> u64 {
533 let base_epoch = Epoch(base_epoch);
534 match retention_seconds {
535 Some(retention_seconds_u32) => {
536 base_epoch
537 .subtract_ms(*retention_seconds_u32 as u64 * 1000)
538 .0
539 }
540 None => 0,
541 }
542}
543
544#[derive(Default, Clone)]
545pub struct WriteOptions {
546 pub epoch: u64,
547 pub table_id: TableId,
548}
549
550impl From<TracedWriteOptions> for WriteOptions {
551 fn from(value: TracedWriteOptions) -> Self {
552 Self {
553 epoch: value.epoch,
554 table_id: value.table_id.into(),
555 }
556 }
557}
558
559pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
560
561pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
562 LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
563
564#[derive(Default, Clone)]
565pub enum OpConsistencyLevel {
566 #[default]
567 Inconsistent,
568 ConsistentOldValue {
569 check_old_value: Arc<dyn CheckOldValueEquality>,
570 is_log_store: bool,
572 },
573}
574
575impl Debug for OpConsistencyLevel {
576 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
577 match self {
578 OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
579 OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
580 .debug_struct("OpConsistencyLevel::ConsistentOldValue")
581 .field("is_log_store", is_log_store)
582 .finish(),
583 }
584 }
585}
586
587impl PartialEq<Self> for OpConsistencyLevel {
588 fn eq(&self, other: &Self) -> bool {
589 matches!(
590 (self, other),
591 (
592 OpConsistencyLevel::Inconsistent,
593 OpConsistencyLevel::Inconsistent
594 ) | (
595 OpConsistencyLevel::ConsistentOldValue {
596 is_log_store: true,
597 ..
598 },
599 OpConsistencyLevel::ConsistentOldValue {
600 is_log_store: true,
601 ..
602 },
603 ) | (
604 OpConsistencyLevel::ConsistentOldValue {
605 is_log_store: false,
606 ..
607 },
608 OpConsistencyLevel::ConsistentOldValue {
609 is_log_store: false,
610 ..
611 },
612 )
613 )
614 }
615}
616
617impl Eq for OpConsistencyLevel {}
618
619impl OpConsistencyLevel {
620 pub fn update(&mut self, new_level: &OpConsistencyLevel) {
621 assert_ne!(self, new_level);
622 *self = new_level.clone()
623 }
624}
625
626#[derive(Clone)]
627pub struct NewLocalOptions {
628 pub table_id: TableId,
629 pub op_consistency_level: OpConsistencyLevel,
637 pub table_option: TableOption,
638
639 pub is_replicated: bool,
642
643 pub vnodes: Arc<Bitmap>,
645}
646
647impl From<TracedNewLocalOptions> for NewLocalOptions {
648 fn from(value: TracedNewLocalOptions) -> Self {
649 Self {
650 table_id: value.table_id.into(),
651 op_consistency_level: match value.op_consistency_level {
652 TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
653 TracedOpConsistencyLevel::ConsistentOldValue => {
654 OpConsistencyLevel::ConsistentOldValue {
655 check_old_value: CHECK_BYTES_EQUAL.clone(),
656 is_log_store: false,
658 }
659 }
660 },
661 table_option: value.table_option.into(),
662 is_replicated: value.is_replicated,
663 vnodes: Arc::new(value.vnodes.into()),
664 }
665 }
666}
667
668impl From<NewLocalOptions> for TracedNewLocalOptions {
669 fn from(value: NewLocalOptions) -> Self {
670 Self {
671 table_id: value.table_id.into(),
672 op_consistency_level: match value.op_consistency_level {
673 OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
674 OpConsistencyLevel::ConsistentOldValue { .. } => {
675 TracedOpConsistencyLevel::ConsistentOldValue
676 }
677 },
678 table_option: value.table_option.into(),
679 is_replicated: value.is_replicated,
680 vnodes: value.vnodes.as_ref().clone().into(),
681 }
682 }
683}
684
685impl NewLocalOptions {
686 pub fn new(
687 table_id: TableId,
688 op_consistency_level: OpConsistencyLevel,
689 table_option: TableOption,
690 vnodes: Arc<Bitmap>,
691 ) -> Self {
692 NewLocalOptions {
693 table_id,
694 op_consistency_level,
695 table_option,
696 is_replicated: false,
697 vnodes,
698 }
699 }
700
701 pub fn new_replicated(
702 table_id: TableId,
703 op_consistency_level: OpConsistencyLevel,
704 table_option: TableOption,
705 vnodes: Arc<Bitmap>,
706 ) -> Self {
707 NewLocalOptions {
708 table_id,
709 op_consistency_level,
710 table_option,
711 is_replicated: true,
712 vnodes,
713 }
714 }
715
716 pub fn for_test(table_id: TableId) -> Self {
717 Self {
718 table_id,
719 op_consistency_level: OpConsistencyLevel::Inconsistent,
720 table_option: TableOption {
721 retention_seconds: None,
722 },
723 is_replicated: false,
724 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
725 }
726 }
727}
728
729#[derive(Clone)]
730pub struct InitOptions {
731 pub epoch: EpochPair,
732}
733
734impl InitOptions {
735 pub fn new(epoch: EpochPair) -> Self {
736 Self { epoch }
737 }
738}
739
740impl From<InitOptions> for TracedInitOptions {
741 fn from(value: InitOptions) -> Self {
742 TracedInitOptions {
743 epoch: value.epoch.into(),
744 }
745 }
746}
747
748impl From<TracedInitOptions> for InitOptions {
749 fn from(value: TracedInitOptions) -> Self {
750 InitOptions {
751 epoch: value.epoch.into(),
752 }
753 }
754}
755
756#[derive(Clone, Debug)]
757pub struct SealCurrentEpochOptions {
758 pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
759 pub switch_op_consistency_level: Option<OpConsistencyLevel>,
760}
761
762impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
763 fn from(value: SealCurrentEpochOptions) -> Self {
764 TracedSealCurrentEpochOptions {
765 table_watermarks: value.table_watermarks.map(
766 |(direction, watermarks, watermark_type)| {
767 (
768 direction == WatermarkDirection::Ascending,
769 watermarks
770 .into_iter()
771 .map(|watermark| {
772 let pb_watermark = PbVnodeWatermark::from(watermark);
773 Message::encode_to_vec(&pb_watermark)
774 })
775 .collect(),
776 match watermark_type {
777 WatermarkSerdeType::NonPkPrefix => true,
778 WatermarkSerdeType::PkPrefix => false,
779 },
780 )
781 },
782 ),
783 switch_op_consistency_level: value
784 .switch_op_consistency_level
785 .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
786 }
787 }
788}
789
790impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
791 fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
792 SealCurrentEpochOptions {
793 table_watermarks: value.table_watermarks.map(
794 |(is_ascending, watermarks, is_non_pk_prefix)| {
795 (
796 if is_ascending {
797 WatermarkDirection::Ascending
798 } else {
799 WatermarkDirection::Descending
800 },
801 watermarks
802 .into_iter()
803 .map(|serialized_watermark| {
804 Message::decode(serialized_watermark.as_slice())
805 .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
806 .expect("should not failed")
807 })
808 .collect(),
809 if is_non_pk_prefix {
810 WatermarkSerdeType::NonPkPrefix
811 } else {
812 WatermarkSerdeType::PkPrefix
813 },
814 )
815 },
816 ),
817 switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
818 if enable {
819 OpConsistencyLevel::ConsistentOldValue {
820 check_old_value: CHECK_BYTES_EQUAL.clone(),
821 is_log_store: false,
822 }
823 } else {
824 OpConsistencyLevel::Inconsistent
825 }
826 }),
827 }
828 }
829}
830
831impl SealCurrentEpochOptions {
832 pub fn for_test() -> Self {
833 Self {
834 table_watermarks: None,
835 switch_op_consistency_level: None,
836 }
837 }
838}