1use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::{Op, VectorRef};
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::{Epoch, EpochPair, MAX_EPOCH};
30use risingwave_common::vector::distance::DistanceMeasurement;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
33use risingwave_hummock_sdk::table_watermark::{
34 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
35};
36use risingwave_hummock_trace::{
37 TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
38 TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
39};
40use risingwave_pb::hummock::{PbVnodeWatermark, PbWatermarkSerdeType};
41
42use crate::error::{StorageError, StorageResult};
43use crate::hummock::CachePolicy;
44use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
45pub(crate) use crate::vector::{OnNearestItem, Vector};
46
47pub trait StaticSendSync = Send + Sync + 'static;
48
49pub trait IterItem: Send + 'static {
50 type ItemRef<'a>: Send + Copy + 'a;
51}
52
53impl IterItem for StateStoreKeyedRow {
54 type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
55}
56
57impl IterItem for StateStoreReadLogItem {
58 type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
59}
60
61pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
62 fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
63}
64
65pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
66 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
67}
68
69pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
70 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
71 + Send;
72
73 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
74 self,
75 f: F,
76 ) -> Self::ItemStream<O, F>;
77
78 fn fused(self) -> FusedStateStoreIter<Self, T> {
79 FusedStateStoreIter::new(self)
80 }
81}
82
83#[try_stream(ok = O, error = StorageError)]
84async fn into_stream_inner<
85 T: IterItem,
86 I: StateStoreIter<T>,
87 O: Send,
88 F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
89>(
90 iter: I,
91 f: F,
92) {
93 let mut iter = iter.fused();
94 while let Some(item) = iter.try_next().await? {
95 yield f(item)?;
96 }
97}
98
99pub struct FromStreamStateStoreIter<S> {
100 inner: S,
101 item_buffer: Option<StateStoreKeyedRow>,
102}
103
104impl<S> FromStreamStateStoreIter<S> {
105 pub fn new(inner: S) -> Self {
106 Self {
107 inner,
108 item_buffer: None,
109 }
110 }
111}
112
113impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
114 for FromStreamStateStoreIter<S>
115{
116 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
117 self.item_buffer = self.inner.try_next().await?;
118 Ok(self
119 .item_buffer
120 .as_ref()
121 .map(|(key, value)| (key.to_ref(), value.as_ref())))
122 }
123}
124
125pub struct FusedStateStoreIter<I, T> {
126 inner: I,
127 finished: bool,
128 _phantom: PhantomData<T>,
129}
130
131impl<I, T> FusedStateStoreIter<I, T> {
132 fn new(inner: I) -> Self {
133 Self {
134 inner,
135 finished: false,
136 _phantom: PhantomData,
137 }
138 }
139}
140
141impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
142 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
143 assert!(!self.finished, "call try_next after finish");
144 let result = self.inner.try_next().await;
145 match &result {
146 Ok(Some(_)) => {}
147 Ok(None) | Err(_) => {
148 self.finished = true;
149 }
150 }
151 result
152 }
153}
154
155impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
156 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
157 impl Stream<Item = StorageResult<O>> + Send;
158
159 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
160 self,
161 f: F,
162 ) -> Self::ItemStream<O, F> {
163 into_stream_inner(self, f)
164 }
165}
166
167pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
168pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
169pub trait StateStoreReadIter = StateStoreIter + 'static;
170
171#[derive(Clone, Copy, Eq, PartialEq, Debug)]
172pub enum ChangeLogValue<T> {
173 Insert(T),
174 Update { new_value: T, old_value: T },
175 Delete(T),
176}
177
178impl<T> ChangeLogValue<T> {
179 pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
180 Ok(match self {
181 ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
182 ChangeLogValue::Update {
183 new_value,
184 old_value,
185 } => ChangeLogValue::Update {
186 new_value: f(new_value)?,
187 old_value: f(old_value)?,
188 },
189 ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
190 })
191 }
192
193 pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
194 std::iter::from_coroutine(
195 #[coroutine]
196 move || match self {
197 Self::Insert(row) => {
198 yield (Op::Insert, row);
199 }
200 Self::Delete(row) => {
201 yield (Op::Delete, row);
202 }
203 Self::Update {
204 old_value,
205 new_value,
206 } => {
207 yield (Op::UpdateDelete, old_value);
208 yield (Op::UpdateInsert, new_value);
209 }
210 },
211 )
212 }
213}
214
215impl<T: AsRef<[u8]>> ChangeLogValue<T> {
216 pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
217 match self {
218 ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
219 ChangeLogValue::Update {
220 new_value,
221 old_value,
222 } => ChangeLogValue::Update {
223 new_value: new_value.as_ref(),
224 old_value: old_value.as_ref(),
225 },
226 ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
227 }
228 }
229}
230
231pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
232pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
233
234#[derive(Clone)]
235pub struct NextEpochOptions {
236 pub table_id: TableId,
237}
238
239#[derive(Clone)]
240pub struct ReadLogOptions {
241 pub table_id: TableId,
242}
243
244pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
245pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
246
247pub trait StateStoreReadLog: StaticSendSync {
248 type ChangeLogIter: StateStoreReadChangeLogIter;
249
250 fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
251
252 fn iter_log(
253 &self,
254 epoch_range: (u64, u64),
255 key_range: TableKeyRange,
256 options: ReadLogOptions,
257 ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
258}
259
260pub trait KeyValueFn<'a, O> =
261 for<'kv> FnOnce(FullKey<&'kv [u8]>, &'kv [u8]) -> StorageResult<O> + Send + 'a;
262
263pub trait StateStoreGet: StaticSendSync {
264 fn on_key_value<'a, O: Send + 'a>(
265 &'a self,
266 key: TableKey<Bytes>,
267 read_options: ReadOptions,
268 on_key_value_fn: impl KeyValueFn<'a, O>,
269 ) -> impl StorageFuture<'a, Option<O>>;
270}
271
272pub trait StateStoreRead: StateStoreGet + StaticSendSync {
273 type Iter: StateStoreReadIter;
274 type RevIter: StateStoreReadIter;
275
276 fn iter(
282 &self,
283 key_range: TableKeyRange,
284 read_options: ReadOptions,
285 ) -> impl StorageFuture<'_, Self::Iter>;
286
287 fn rev_iter(
288 &self,
289 key_range: TableKeyRange,
290 read_options: ReadOptions,
291 ) -> impl StorageFuture<'_, Self::RevIter>;
292}
293
294#[derive(Clone)]
295pub struct TryWaitEpochOptions {
296 pub table_id: TableId,
297}
298
299impl TryWaitEpochOptions {
300 #[cfg(any(test, feature = "test"))]
301 pub fn for_test(table_id: TableId) -> Self {
302 Self { table_id }
303 }
304}
305
306impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
307 fn from(value: TracedTryWaitEpochOptions) -> Self {
308 Self {
309 table_id: value.table_id.into(),
310 }
311 }
312}
313
314impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
315 fn from(value: TryWaitEpochOptions) -> Self {
316 Self {
317 table_id: value.table_id.into(),
318 }
319 }
320}
321
322#[derive(Clone, Copy)]
323pub struct NewReadSnapshotOptions {
324 pub table_id: TableId,
325 pub table_option: TableOption,
326}
327
328#[derive(Clone)]
329pub struct NewVectorWriterOptions {
330 pub table_id: TableId,
331}
332
333pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
334 type Local: LocalStateStore;
335 type ReadSnapshot: StateStoreRead + StateStoreReadVector + Clone;
336 type VectorWriter: StateStoreWriteVector;
337
338 fn try_wait_epoch(
341 &self,
342 epoch: HummockReadEpoch,
343 options: TryWaitEpochOptions,
344 ) -> impl StorageFuture<'_, ()>;
345
346 fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
348 MonitoredStateStore::new(self, storage_metrics)
349 }
350
351 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
352
353 fn new_read_snapshot(
354 &self,
355 epoch: HummockReadEpoch,
356 options: NewReadSnapshotOptions,
357 ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
358
359 fn new_vector_writer(
360 &self,
361 options: NewVectorWriterOptions,
362 ) -> impl Future<Output = Self::VectorWriter> + Send + '_;
363}
364
365pub trait LocalStateStore: StateStoreGet + StateStoreWriteEpochControl + StaticSendSync {
369 type FlushedSnapshotReader: StateStoreRead;
370 type Iter<'a>: StateStoreIter + 'a;
371 type RevIter<'a>: StateStoreIter + 'a;
372
373 fn iter(
379 &self,
380 key_range: TableKeyRange,
381 read_options: ReadOptions,
382 ) -> impl StorageFuture<'_, Self::Iter<'_>>;
383
384 fn rev_iter(
385 &self,
386 key_range: TableKeyRange,
387 read_options: ReadOptions,
388 ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
389
390 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
391
392 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
394
395 fn insert(
397 &mut self,
398 key: TableKey<Bytes>,
399 new_val: Bytes,
400 old_val: Option<Bytes>,
401 ) -> StorageResult<()>;
402
403 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
406
407 fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
410}
411
412pub trait StateStoreWriteEpochControl: StaticSendSync {
413 fn flush(&mut self) -> impl StorageFuture<'_, usize>;
414
415 fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
416
417 fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
424
425 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
429}
430
431pub trait StateStoreWriteVector: StateStoreWriteEpochControl + StaticSendSync {
432 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
433}
434
435pub struct VectorNearestOptions {
436 pub top_n: usize,
437 pub measure: DistanceMeasurement,
438 pub hnsw_ef_search: usize,
439}
440
441pub trait OnNearestItemFn<'a, O> = OnNearestItem<O> + Send + Sync + 'a;
442
443pub trait StateStoreReadVector: StaticSendSync {
444 fn nearest<'a, O: Send + 'a>(
445 &'a self,
446 vec: VectorRef<'a>,
447 options: VectorNearestOptions,
448 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
449 ) -> impl StorageFuture<'a, Vec<O>>;
450}
451
452#[derive(Default, Clone, Copy)]
457pub struct PrefetchOptions {
458 pub prefetch: bool,
459 pub for_large_query: bool,
460}
461
462impl PrefetchOptions {
463 pub fn prefetch_for_large_range_scan() -> Self {
464 Self {
465 prefetch: true,
466 for_large_query: true,
467 }
468 }
469
470 pub fn prefetch_for_small_range_scan() -> Self {
471 Self {
472 prefetch: true,
473 for_large_query: false,
474 }
475 }
476
477 pub fn new(prefetch: bool, for_large_query: bool) -> Self {
478 Self {
479 prefetch,
480 for_large_query,
481 }
482 }
483}
484
485impl From<TracedPrefetchOptions> for PrefetchOptions {
486 fn from(value: TracedPrefetchOptions) -> Self {
487 Self {
488 prefetch: value.prefetch,
489 for_large_query: value.for_large_query,
490 }
491 }
492}
493
494impl From<PrefetchOptions> for TracedPrefetchOptions {
495 fn from(value: PrefetchOptions) -> Self {
496 Self {
497 prefetch: value.prefetch,
498 for_large_query: value.for_large_query,
499 }
500 }
501}
502
503#[derive(Default, Clone)]
504pub struct ReadOptions {
505 pub prefix_hint: Option<Bytes>,
509 pub prefetch_options: PrefetchOptions,
510 pub cache_policy: CachePolicy,
511}
512
513impl From<TracedReadOptions> for ReadOptions {
514 fn from(value: TracedReadOptions) -> Self {
515 Self {
516 prefix_hint: value.prefix_hint.map(|b| b.into()),
517 prefetch_options: value.prefetch_options.into(),
518 cache_policy: value.cache_policy.into(),
519 }
520 }
521}
522
523impl ReadOptions {
524 pub fn into_traced_read_options(
525 self,
526 table_id: TableId,
527 epoch: Option<HummockReadEpoch>,
528 table_option: TableOption,
529 ) -> TracedReadOptions {
530 let value = self;
531 let (read_version_from_backup, read_committed) = match epoch {
532 None | Some(HummockReadEpoch::NoWait(_)) => (false, false),
533 Some(HummockReadEpoch::Backup(_)) => (true, false),
534 Some(HummockReadEpoch::Committed(_))
535 | Some(HummockReadEpoch::BatchQueryCommitted(_, _))
536 | Some(HummockReadEpoch::TimeTravel(_)) => (false, true),
537 };
538 TracedReadOptions {
539 prefix_hint: value.prefix_hint.map(|b| b.into()),
540 prefetch_options: value.prefetch_options.into(),
541 cache_policy: value.cache_policy.into(),
542 retention_seconds: table_option.retention_seconds,
543 table_id: table_id.into(),
544 read_version_from_backup,
545 read_committed,
546 }
547 }
548}
549
550pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<u32>) -> u64 {
551 match retention_seconds {
552 Some(retention_seconds_u32) => {
553 if base_epoch == MAX_EPOCH {
554 panic!("generate min epoch for MAX_EPOCH");
555 }
556 Epoch(base_epoch)
557 .subtract_ms(retention_seconds_u32 as u64 * 1000)
558 .0
559 }
560 None => 0,
561 }
562}
563
564pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
565
566pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
567 LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
568
569#[derive(Default, Clone)]
570pub enum OpConsistencyLevel {
571 #[default]
572 Inconsistent,
573 ConsistentOldValue {
574 check_old_value: Arc<dyn CheckOldValueEquality>,
575 is_log_store: bool,
577 },
578}
579
580impl Debug for OpConsistencyLevel {
581 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582 match self {
583 OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
584 OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
585 .debug_struct("OpConsistencyLevel::ConsistentOldValue")
586 .field("is_log_store", is_log_store)
587 .finish(),
588 }
589 }
590}
591
592impl PartialEq<Self> for OpConsistencyLevel {
593 fn eq(&self, other: &Self) -> bool {
594 matches!(
595 (self, other),
596 (
597 OpConsistencyLevel::Inconsistent,
598 OpConsistencyLevel::Inconsistent
599 ) | (
600 OpConsistencyLevel::ConsistentOldValue {
601 is_log_store: true,
602 ..
603 },
604 OpConsistencyLevel::ConsistentOldValue {
605 is_log_store: true,
606 ..
607 },
608 ) | (
609 OpConsistencyLevel::ConsistentOldValue {
610 is_log_store: false,
611 ..
612 },
613 OpConsistencyLevel::ConsistentOldValue {
614 is_log_store: false,
615 ..
616 },
617 )
618 )
619 }
620}
621
622impl Eq for OpConsistencyLevel {}
623
624impl OpConsistencyLevel {
625 pub fn update(&mut self, new_level: &OpConsistencyLevel) {
626 assert_ne!(self, new_level);
627 *self = new_level.clone()
628 }
629}
630
631#[derive(Clone)]
632pub struct NewLocalOptions {
633 pub table_id: TableId,
634 pub op_consistency_level: OpConsistencyLevel,
642 pub table_option: TableOption,
643
644 pub is_replicated: bool,
647
648 pub vnodes: Arc<Bitmap>,
650
651 pub upload_on_flush: bool,
652}
653
654impl From<TracedNewLocalOptions> for NewLocalOptions {
655 fn from(value: TracedNewLocalOptions) -> Self {
656 Self {
657 table_id: value.table_id.into(),
658 op_consistency_level: match value.op_consistency_level {
659 TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
660 TracedOpConsistencyLevel::ConsistentOldValue => {
661 OpConsistencyLevel::ConsistentOldValue {
662 check_old_value: CHECK_BYTES_EQUAL.clone(),
663 is_log_store: false,
665 }
666 }
667 },
668 table_option: value.table_option.into(),
669 is_replicated: value.is_replicated,
670 vnodes: Arc::new(value.vnodes.into()),
671 upload_on_flush: value.upload_on_flush,
672 }
673 }
674}
675
676impl From<NewLocalOptions> for TracedNewLocalOptions {
677 fn from(value: NewLocalOptions) -> Self {
678 Self {
679 table_id: value.table_id.into(),
680 op_consistency_level: match value.op_consistency_level {
681 OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
682 OpConsistencyLevel::ConsistentOldValue { .. } => {
683 TracedOpConsistencyLevel::ConsistentOldValue
684 }
685 },
686 table_option: value.table_option.into(),
687 is_replicated: value.is_replicated,
688 vnodes: value.vnodes.as_ref().clone().into(),
689 upload_on_flush: value.upload_on_flush,
690 }
691 }
692}
693
694impl NewLocalOptions {
695 pub fn new(
696 table_id: TableId,
697 op_consistency_level: OpConsistencyLevel,
698 table_option: TableOption,
699 vnodes: Arc<Bitmap>,
700 upload_on_flush: bool,
701 ) -> Self {
702 NewLocalOptions {
703 table_id,
704 op_consistency_level,
705 table_option,
706 is_replicated: false,
707 vnodes,
708 upload_on_flush,
709 }
710 }
711
712 pub fn new_replicated(
713 table_id: TableId,
714 op_consistency_level: OpConsistencyLevel,
715 table_option: TableOption,
716 vnodes: Arc<Bitmap>,
717 ) -> Self {
718 NewLocalOptions {
719 table_id,
720 op_consistency_level,
721 table_option,
722 is_replicated: true,
723 vnodes,
724 upload_on_flush: false,
725 }
726 }
727
728 pub fn for_test(table_id: TableId) -> Self {
729 Self {
730 table_id,
731 op_consistency_level: OpConsistencyLevel::Inconsistent,
732 table_option: TableOption {
733 retention_seconds: None,
734 },
735 is_replicated: false,
736 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
737 upload_on_flush: true,
738 }
739 }
740}
741
742#[derive(Clone)]
743pub struct InitOptions {
744 pub epoch: EpochPair,
745}
746
747impl InitOptions {
748 pub fn new(epoch: EpochPair) -> Self {
749 Self { epoch }
750 }
751}
752
753impl From<InitOptions> for TracedInitOptions {
754 fn from(value: InitOptions) -> Self {
755 TracedInitOptions {
756 epoch: value.epoch.into(),
757 }
758 }
759}
760
761impl From<TracedInitOptions> for InitOptions {
762 fn from(value: TracedInitOptions) -> Self {
763 InitOptions {
764 epoch: value.epoch.into(),
765 }
766 }
767}
768
769#[derive(Clone, Debug)]
770pub struct SealCurrentEpochOptions {
771 pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
772 pub switch_op_consistency_level: Option<OpConsistencyLevel>,
773}
774
775impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
776 fn from(value: SealCurrentEpochOptions) -> Self {
777 TracedSealCurrentEpochOptions {
778 table_watermarks: value.table_watermarks.map(
779 |(direction, watermarks, watermark_type)| {
780 (
781 direction == WatermarkDirection::Ascending,
782 watermarks
783 .into_iter()
784 .map(|watermark| {
785 let pb_watermark = PbVnodeWatermark::from(watermark);
786 Message::encode_to_vec(&pb_watermark)
787 })
788 .collect(),
789 PbWatermarkSerdeType::from(watermark_type) as i32,
790 )
791 },
792 ),
793 switch_op_consistency_level: value
794 .switch_op_consistency_level
795 .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
796 }
797 }
798}
799
800impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
801 fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
802 SealCurrentEpochOptions {
803 table_watermarks: value.table_watermarks.map(
804 |(is_ascending, watermarks, watermark_serde_type)| {
805 (
806 if is_ascending {
807 WatermarkDirection::Ascending
808 } else {
809 WatermarkDirection::Descending
810 },
811 watermarks
812 .into_iter()
813 .map(|serialized_watermark| {
814 Message::decode(serialized_watermark.as_slice())
815 .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
816 .expect("should not failed")
817 })
818 .collect(),
819 match PbWatermarkSerdeType::try_from(watermark_serde_type).unwrap() {
820 PbWatermarkSerdeType::TypeUnspecified => unreachable!(),
821 PbWatermarkSerdeType::PkPrefix => WatermarkSerdeType::PkPrefix,
822 PbWatermarkSerdeType::NonPkPrefix => WatermarkSerdeType::NonPkPrefix,
823 PbWatermarkSerdeType::Value => WatermarkSerdeType::Value,
824 },
825 )
826 },
827 ),
828 switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
829 if enable {
830 OpConsistencyLevel::ConsistentOldValue {
831 check_old_value: CHECK_BYTES_EQUAL.clone(),
832 is_log_store: false,
833 }
834 } else {
835 OpConsistencyLevel::Inconsistent
836 }
837 }),
838 }
839 }
840}
841
842impl SealCurrentEpochOptions {
843 pub fn for_test() -> Self {
844 Self {
845 table_watermarks: None,
846 switch_op_consistency_level: None,
847 }
848 }
849}