1use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::{Epoch, EpochPair};
30use risingwave_common::vector::distance::DistanceMeasurement;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
33use risingwave_hummock_sdk::table_watermark::{
34 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
35};
36use risingwave_hummock_trace::{
37 TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
38 TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
39};
40use risingwave_pb::hummock::PbVnodeWatermark;
41
42use crate::error::{StorageError, StorageResult};
43use crate::hummock::CachePolicy;
44use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
45pub(crate) use crate::vector::{OnNearestItem, Vector};
46
47pub trait StaticSendSync = Send + Sync + 'static;
48
49pub trait IterItem: Send + 'static {
50 type ItemRef<'a>: Send + Copy + 'a;
51}
52
53impl IterItem for StateStoreKeyedRow {
54 type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
55}
56
57impl IterItem for StateStoreReadLogItem {
58 type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
59}
60
61pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
62 fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
63}
64
65pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
66 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
67}
68
69pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
70 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
71 + Send;
72
73 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
74 self,
75 f: F,
76 ) -> Self::ItemStream<O, F>;
77
78 fn fused(self) -> FusedStateStoreIter<Self, T> {
79 FusedStateStoreIter::new(self)
80 }
81}
82
83#[try_stream(ok = O, error = StorageError)]
84async fn into_stream_inner<
85 T: IterItem,
86 I: StateStoreIter<T>,
87 O: Send,
88 F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
89>(
90 iter: I,
91 f: F,
92) {
93 let mut iter = iter.fused();
94 while let Some(item) = iter.try_next().await? {
95 yield f(item)?;
96 }
97}
98
99pub struct FromStreamStateStoreIter<S> {
100 inner: S,
101 item_buffer: Option<StateStoreKeyedRow>,
102}
103
104impl<S> FromStreamStateStoreIter<S> {
105 pub fn new(inner: S) -> Self {
106 Self {
107 inner,
108 item_buffer: None,
109 }
110 }
111}
112
113impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
114 for FromStreamStateStoreIter<S>
115{
116 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
117 self.item_buffer = self.inner.try_next().await?;
118 Ok(self
119 .item_buffer
120 .as_ref()
121 .map(|(key, value)| (key.to_ref(), value.as_ref())))
122 }
123}
124
125pub struct FusedStateStoreIter<I, T> {
126 inner: I,
127 finished: bool,
128 _phantom: PhantomData<T>,
129}
130
131impl<I, T> FusedStateStoreIter<I, T> {
132 fn new(inner: I) -> Self {
133 Self {
134 inner,
135 finished: false,
136 _phantom: PhantomData,
137 }
138 }
139}
140
141impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
142 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
143 assert!(!self.finished, "call try_next after finish");
144 let result = self.inner.try_next().await;
145 match &result {
146 Ok(Some(_)) => {}
147 Ok(None) | Err(_) => {
148 self.finished = true;
149 }
150 }
151 result
152 }
153}
154
155impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
156 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
157 impl Stream<Item = StorageResult<O>> + Send;
158
159 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
160 self,
161 f: F,
162 ) -> Self::ItemStream<O, F> {
163 into_stream_inner(self, f)
164 }
165}
166
167pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
168pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
169pub trait StateStoreReadIter = StateStoreIter + 'static;
170
171#[derive(Clone, Copy, Eq, PartialEq, Debug)]
172pub enum ChangeLogValue<T> {
173 Insert(T),
174 Update { new_value: T, old_value: T },
175 Delete(T),
176}
177
178impl<T> ChangeLogValue<T> {
179 pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
180 Ok(match self {
181 ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
182 ChangeLogValue::Update {
183 new_value,
184 old_value,
185 } => ChangeLogValue::Update {
186 new_value: f(new_value)?,
187 old_value: f(old_value)?,
188 },
189 ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
190 })
191 }
192
193 pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
194 std::iter::from_coroutine(
195 #[coroutine]
196 move || match self {
197 Self::Insert(row) => {
198 yield (Op::Insert, row);
199 }
200 Self::Delete(row) => {
201 yield (Op::Delete, row);
202 }
203 Self::Update {
204 old_value,
205 new_value,
206 } => {
207 yield (Op::UpdateDelete, old_value);
208 yield (Op::UpdateInsert, new_value);
209 }
210 },
211 )
212 }
213}
214
215impl<T: AsRef<[u8]>> ChangeLogValue<T> {
216 pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
217 match self {
218 ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
219 ChangeLogValue::Update {
220 new_value,
221 old_value,
222 } => ChangeLogValue::Update {
223 new_value: new_value.as_ref(),
224 old_value: old_value.as_ref(),
225 },
226 ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
227 }
228 }
229}
230
231pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
232pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
233
234#[derive(Clone)]
235pub struct NextEpochOptions {
236 pub table_id: TableId,
237}
238
239#[derive(Clone)]
240pub struct ReadLogOptions {
241 pub table_id: TableId,
242}
243
244pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
245pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
246
247pub trait StateStoreReadLog: StaticSendSync {
248 type ChangeLogIter: StateStoreReadChangeLogIter;
249
250 fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
251
252 fn iter_log(
253 &self,
254 epoch_range: (u64, u64),
255 key_range: TableKeyRange,
256 options: ReadLogOptions,
257 ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
258}
259
260pub trait KeyValueFn<O> =
261 for<'kv> FnOnce(FullKey<&'kv [u8]>, &'kv [u8]) -> StorageResult<O> + Send + 'static;
262
263pub trait StateStoreGet: StaticSendSync {
264 fn on_key_value<O: Send + 'static>(
265 &self,
266 key: TableKey<Bytes>,
267 read_options: ReadOptions,
268 on_key_value_fn: impl KeyValueFn<O>,
269 ) -> impl StorageFuture<'_, Option<O>>;
270}
271
272pub trait StateStoreRead: StateStoreGet + StaticSendSync {
273 type Iter: StateStoreReadIter;
274 type RevIter: StateStoreReadIter;
275
276 fn iter(
282 &self,
283 key_range: TableKeyRange,
284 read_options: ReadOptions,
285 ) -> impl StorageFuture<'_, Self::Iter>;
286
287 fn rev_iter(
288 &self,
289 key_range: TableKeyRange,
290 read_options: ReadOptions,
291 ) -> impl StorageFuture<'_, Self::RevIter>;
292}
293
294#[derive(Clone)]
295pub struct TryWaitEpochOptions {
296 pub table_id: TableId,
297}
298
299impl TryWaitEpochOptions {
300 #[cfg(any(test, feature = "test"))]
301 pub fn for_test(table_id: TableId) -> Self {
302 Self { table_id }
303 }
304}
305
306impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
307 fn from(value: TracedTryWaitEpochOptions) -> Self {
308 Self {
309 table_id: value.table_id.into(),
310 }
311 }
312}
313
314impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
315 fn from(value: TryWaitEpochOptions) -> Self {
316 Self {
317 table_id: value.table_id.into(),
318 }
319 }
320}
321
322#[derive(Clone, Copy)]
323pub struct NewReadSnapshotOptions {
324 pub table_id: TableId,
325}
326
327#[derive(Clone)]
328pub struct NewVectorWriterOptions {
329 pub table_id: TableId,
330}
331
332pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
333 type Local: LocalStateStore;
334 type ReadSnapshot: StateStoreRead + StateStoreReadVector + Clone;
335 type VectorWriter: StateStoreWriteVector;
336
337 fn try_wait_epoch(
340 &self,
341 epoch: HummockReadEpoch,
342 options: TryWaitEpochOptions,
343 ) -> impl StorageFuture<'_, ()>;
344
345 fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
347 MonitoredStateStore::new(self, storage_metrics)
348 }
349
350 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
351
352 fn new_read_snapshot(
353 &self,
354 epoch: HummockReadEpoch,
355 options: NewReadSnapshotOptions,
356 ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
357
358 fn new_vector_writer(
359 &self,
360 options: NewVectorWriterOptions,
361 ) -> impl Future<Output = Self::VectorWriter> + Send + '_;
362}
363
364pub trait LocalStateStore: StateStoreGet + StateStoreWriteEpochControl + StaticSendSync {
368 type FlushedSnapshotReader: StateStoreRead;
369 type Iter<'a>: StateStoreIter + 'a;
370 type RevIter<'a>: StateStoreIter + 'a;
371
372 fn iter(
378 &self,
379 key_range: TableKeyRange,
380 read_options: ReadOptions,
381 ) -> impl StorageFuture<'_, Self::Iter<'_>>;
382
383 fn rev_iter(
384 &self,
385 key_range: TableKeyRange,
386 read_options: ReadOptions,
387 ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
388
389 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
390
391 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
393
394 fn insert(
396 &mut self,
397 key: TableKey<Bytes>,
398 new_val: Bytes,
399 old_val: Option<Bytes>,
400 ) -> StorageResult<()>;
401
402 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
405
406 fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
409}
410
411pub trait StateStoreWriteEpochControl: StaticSendSync {
412 fn flush(&mut self) -> impl StorageFuture<'_, usize>;
413
414 fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
415
416 fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
423
424 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
428}
429
430pub trait StateStoreWriteVector: StateStoreWriteEpochControl + StaticSendSync {
431 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
432}
433
434pub struct VectorNearestOptions {
435 pub top_n: usize,
436 pub measure: DistanceMeasurement,
437 pub hnsw_ef_search: usize,
438}
439
440pub trait OnNearestItemFn<O> = OnNearestItem<O> + Send + Sync + 'static;
441
442pub trait StateStoreReadVector: StaticSendSync {
443 fn nearest<O: Send + 'static>(
444 &self,
445 vec: Vector,
446 options: VectorNearestOptions,
447 on_nearest_item_fn: impl OnNearestItemFn<O>,
448 ) -> impl StorageFuture<'_, Vec<O>>;
449}
450
451#[derive(Default, Clone, Copy)]
456pub struct PrefetchOptions {
457 pub prefetch: bool,
458 pub for_large_query: bool,
459}
460
461impl PrefetchOptions {
462 pub fn prefetch_for_large_range_scan() -> Self {
463 Self {
464 prefetch: true,
465 for_large_query: true,
466 }
467 }
468
469 pub fn prefetch_for_small_range_scan() -> Self {
470 Self {
471 prefetch: true,
472 for_large_query: false,
473 }
474 }
475
476 pub fn new(prefetch: bool, for_large_query: bool) -> Self {
477 Self {
478 prefetch,
479 for_large_query,
480 }
481 }
482}
483
484impl From<TracedPrefetchOptions> for PrefetchOptions {
485 fn from(value: TracedPrefetchOptions) -> Self {
486 Self {
487 prefetch: value.prefetch,
488 for_large_query: value.for_large_query,
489 }
490 }
491}
492
493impl From<PrefetchOptions> for TracedPrefetchOptions {
494 fn from(value: PrefetchOptions) -> Self {
495 Self {
496 prefetch: value.prefetch,
497 for_large_query: value.for_large_query,
498 }
499 }
500}
501
502#[derive(Default, Clone)]
503pub struct ReadOptions {
504 pub prefix_hint: Option<Bytes>,
508 pub prefetch_options: PrefetchOptions,
509 pub cache_policy: CachePolicy,
510
511 pub retention_seconds: Option<u32>,
512}
513
514impl From<TracedReadOptions> for ReadOptions {
515 fn from(value: TracedReadOptions) -> Self {
516 Self {
517 prefix_hint: value.prefix_hint.map(|b| b.into()),
518 prefetch_options: value.prefetch_options.into(),
519 cache_policy: value.cache_policy.into(),
520 retention_seconds: value.retention_seconds,
521 }
522 }
523}
524
525impl ReadOptions {
526 pub fn into_traced_read_options(
527 self,
528 table_id: TableId,
529 epoch: Option<HummockReadEpoch>,
530 ) -> TracedReadOptions {
531 let value = self;
532 let (read_version_from_backup, read_committed) = match epoch {
533 None | Some(HummockReadEpoch::NoWait(_)) => (false, false),
534 Some(HummockReadEpoch::Backup(_)) => (true, false),
535 Some(HummockReadEpoch::Committed(_))
536 | Some(HummockReadEpoch::BatchQueryCommitted(_, _))
537 | Some(HummockReadEpoch::TimeTravel(_)) => (false, true),
538 };
539 TracedReadOptions {
540 prefix_hint: value.prefix_hint.map(|b| b.into()),
541 prefetch_options: value.prefetch_options.into(),
542 cache_policy: value.cache_policy.into(),
543 retention_seconds: value.retention_seconds,
544 table_id: table_id.into(),
545 read_version_from_backup,
546 read_committed,
547 }
548 }
549}
550
551pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<&u32>) -> u64 {
552 let base_epoch = Epoch(base_epoch);
553 match retention_seconds {
554 Some(retention_seconds_u32) => {
555 base_epoch
556 .subtract_ms(*retention_seconds_u32 as u64 * 1000)
557 .0
558 }
559 None => 0,
560 }
561}
562
563pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
564
565pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
566 LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
567
568#[derive(Default, Clone)]
569pub enum OpConsistencyLevel {
570 #[default]
571 Inconsistent,
572 ConsistentOldValue {
573 check_old_value: Arc<dyn CheckOldValueEquality>,
574 is_log_store: bool,
576 },
577}
578
579impl Debug for OpConsistencyLevel {
580 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
581 match self {
582 OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
583 OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
584 .debug_struct("OpConsistencyLevel::ConsistentOldValue")
585 .field("is_log_store", is_log_store)
586 .finish(),
587 }
588 }
589}
590
591impl PartialEq<Self> for OpConsistencyLevel {
592 fn eq(&self, other: &Self) -> bool {
593 matches!(
594 (self, other),
595 (
596 OpConsistencyLevel::Inconsistent,
597 OpConsistencyLevel::Inconsistent
598 ) | (
599 OpConsistencyLevel::ConsistentOldValue {
600 is_log_store: true,
601 ..
602 },
603 OpConsistencyLevel::ConsistentOldValue {
604 is_log_store: true,
605 ..
606 },
607 ) | (
608 OpConsistencyLevel::ConsistentOldValue {
609 is_log_store: false,
610 ..
611 },
612 OpConsistencyLevel::ConsistentOldValue {
613 is_log_store: false,
614 ..
615 },
616 )
617 )
618 }
619}
620
621impl Eq for OpConsistencyLevel {}
622
623impl OpConsistencyLevel {
624 pub fn update(&mut self, new_level: &OpConsistencyLevel) {
625 assert_ne!(self, new_level);
626 *self = new_level.clone()
627 }
628}
629
630#[derive(Clone)]
631pub struct NewLocalOptions {
632 pub table_id: TableId,
633 pub op_consistency_level: OpConsistencyLevel,
641 pub table_option: TableOption,
642
643 pub is_replicated: bool,
646
647 pub vnodes: Arc<Bitmap>,
649
650 pub upload_on_flush: bool,
651}
652
653impl From<TracedNewLocalOptions> for NewLocalOptions {
654 fn from(value: TracedNewLocalOptions) -> Self {
655 Self {
656 table_id: value.table_id.into(),
657 op_consistency_level: match value.op_consistency_level {
658 TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
659 TracedOpConsistencyLevel::ConsistentOldValue => {
660 OpConsistencyLevel::ConsistentOldValue {
661 check_old_value: CHECK_BYTES_EQUAL.clone(),
662 is_log_store: false,
664 }
665 }
666 },
667 table_option: value.table_option.into(),
668 is_replicated: value.is_replicated,
669 vnodes: Arc::new(value.vnodes.into()),
670 upload_on_flush: value.upload_on_flush,
671 }
672 }
673}
674
675impl From<NewLocalOptions> for TracedNewLocalOptions {
676 fn from(value: NewLocalOptions) -> Self {
677 Self {
678 table_id: value.table_id.into(),
679 op_consistency_level: match value.op_consistency_level {
680 OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
681 OpConsistencyLevel::ConsistentOldValue { .. } => {
682 TracedOpConsistencyLevel::ConsistentOldValue
683 }
684 },
685 table_option: value.table_option.into(),
686 is_replicated: value.is_replicated,
687 vnodes: value.vnodes.as_ref().clone().into(),
688 upload_on_flush: value.upload_on_flush,
689 }
690 }
691}
692
693impl NewLocalOptions {
694 pub fn new(
695 table_id: TableId,
696 op_consistency_level: OpConsistencyLevel,
697 table_option: TableOption,
698 vnodes: Arc<Bitmap>,
699 upload_on_flush: bool,
700 ) -> Self {
701 NewLocalOptions {
702 table_id,
703 op_consistency_level,
704 table_option,
705 is_replicated: false,
706 vnodes,
707 upload_on_flush,
708 }
709 }
710
711 pub fn new_replicated(
712 table_id: TableId,
713 op_consistency_level: OpConsistencyLevel,
714 table_option: TableOption,
715 vnodes: Arc<Bitmap>,
716 ) -> Self {
717 NewLocalOptions {
718 table_id,
719 op_consistency_level,
720 table_option,
721 is_replicated: true,
722 vnodes,
723 upload_on_flush: false,
724 }
725 }
726
727 pub fn for_test(table_id: TableId) -> Self {
728 Self {
729 table_id,
730 op_consistency_level: OpConsistencyLevel::Inconsistent,
731 table_option: TableOption {
732 retention_seconds: None,
733 },
734 is_replicated: false,
735 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
736 upload_on_flush: true,
737 }
738 }
739}
740
741#[derive(Clone)]
742pub struct InitOptions {
743 pub epoch: EpochPair,
744}
745
746impl InitOptions {
747 pub fn new(epoch: EpochPair) -> Self {
748 Self { epoch }
749 }
750}
751
752impl From<InitOptions> for TracedInitOptions {
753 fn from(value: InitOptions) -> Self {
754 TracedInitOptions {
755 epoch: value.epoch.into(),
756 }
757 }
758}
759
760impl From<TracedInitOptions> for InitOptions {
761 fn from(value: TracedInitOptions) -> Self {
762 InitOptions {
763 epoch: value.epoch.into(),
764 }
765 }
766}
767
768#[derive(Clone, Debug)]
769pub struct SealCurrentEpochOptions {
770 pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
771 pub switch_op_consistency_level: Option<OpConsistencyLevel>,
772}
773
774impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
775 fn from(value: SealCurrentEpochOptions) -> Self {
776 TracedSealCurrentEpochOptions {
777 table_watermarks: value.table_watermarks.map(
778 |(direction, watermarks, watermark_type)| {
779 (
780 direction == WatermarkDirection::Ascending,
781 watermarks
782 .into_iter()
783 .map(|watermark| {
784 let pb_watermark = PbVnodeWatermark::from(watermark);
785 Message::encode_to_vec(&pb_watermark)
786 })
787 .collect(),
788 match watermark_type {
789 WatermarkSerdeType::NonPkPrefix => true,
790 WatermarkSerdeType::PkPrefix => false,
791 },
792 )
793 },
794 ),
795 switch_op_consistency_level: value
796 .switch_op_consistency_level
797 .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
798 }
799 }
800}
801
802impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
803 fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
804 SealCurrentEpochOptions {
805 table_watermarks: value.table_watermarks.map(
806 |(is_ascending, watermarks, is_non_pk_prefix)| {
807 (
808 if is_ascending {
809 WatermarkDirection::Ascending
810 } else {
811 WatermarkDirection::Descending
812 },
813 watermarks
814 .into_iter()
815 .map(|serialized_watermark| {
816 Message::decode(serialized_watermark.as_slice())
817 .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
818 .expect("should not failed")
819 })
820 .collect(),
821 if is_non_pk_prefix {
822 WatermarkSerdeType::NonPkPrefix
823 } else {
824 WatermarkSerdeType::PkPrefix
825 },
826 )
827 },
828 ),
829 switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
830 if enable {
831 OpConsistencyLevel::ConsistentOldValue {
832 check_old_value: CHECK_BYTES_EQUAL.clone(),
833 is_log_store: false,
834 }
835 } else {
836 OpConsistencyLevel::Inconsistent
837 }
838 }),
839 }
840 }
841}
842
843impl SealCurrentEpochOptions {
844 pub fn for_test() -> Self {
845 Self {
846 table_watermarks: None,
847 switch_op_consistency_level: None,
848 }
849 }
850}