1use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::{Op, VectorRef};
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::{Epoch, EpochPair, MAX_EPOCH};
30use risingwave_common::vector::distance::DistanceMeasurement;
31use risingwave_hummock_sdk::HummockReadEpoch;
32use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
33use risingwave_hummock_sdk::table_watermark::{
34 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
35};
36use risingwave_hummock_trace::{
37 TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
38 TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
39};
40use risingwave_pb::hummock::{PbVnodeWatermark, PbWatermarkSerdeType};
41
42use crate::error::{StorageError, StorageResult};
43use crate::hummock::CachePolicy;
44use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
45pub(crate) use crate::vector::{OnNearestItem, Vector};
46
47mod auto_rebuild;
48pub use auto_rebuild::{AutoRebuildStateStoreReadIter, timeout_auto_rebuild};
49
50pub trait StaticSendSync = Send + Sync + 'static;
51
52pub trait IterItem: Send + 'static {
53 type ItemRef<'a>: Send + Copy + 'a;
54}
55
56impl IterItem for StateStoreKeyedRow {
57 type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
58}
59
60impl IterItem for StateStoreReadLogItem {
61 type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
62}
63
64pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
65 fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
66}
67
68pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
69 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
70}
71
72pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
73 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
74 + Send;
75
76 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
77 self,
78 f: F,
79 ) -> Self::ItemStream<O, F>;
80
81 fn fused(self) -> FusedStateStoreIter<Self, T> {
82 FusedStateStoreIter::new(self)
83 }
84}
85
86#[try_stream(ok = O, error = StorageError)]
87async fn into_stream_inner<
88 T: IterItem,
89 I: StateStoreIter<T>,
90 O: Send,
91 F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
92>(
93 iter: I,
94 f: F,
95) {
96 let mut iter = iter.fused();
97 while let Some(item) = iter.try_next().await? {
98 yield f(item)?;
99 }
100}
101
102pub struct FromStreamStateStoreIter<S> {
103 inner: S,
104 item_buffer: Option<StateStoreKeyedRow>,
105}
106
107impl<S> FromStreamStateStoreIter<S> {
108 pub fn new(inner: S) -> Self {
109 Self {
110 inner,
111 item_buffer: None,
112 }
113 }
114}
115
116impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
117 for FromStreamStateStoreIter<S>
118{
119 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
120 self.item_buffer = self.inner.try_next().await?;
121 Ok(self
122 .item_buffer
123 .as_ref()
124 .map(|(key, value)| (key.to_ref(), value.as_ref())))
125 }
126}
127
128pub struct FusedStateStoreIter<I, T> {
129 inner: I,
130 finished: bool,
131 _phantom: PhantomData<T>,
132}
133
134impl<I, T> FusedStateStoreIter<I, T> {
135 fn new(inner: I) -> Self {
136 Self {
137 inner,
138 finished: false,
139 _phantom: PhantomData,
140 }
141 }
142}
143
144impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
145 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
146 assert!(!self.finished, "call try_next after finish");
147 let result = self.inner.try_next().await;
148 match &result {
149 Ok(Some(_)) => {}
150 Ok(None) | Err(_) => {
151 self.finished = true;
152 }
153 }
154 result
155 }
156}
157
158impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
159 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
160 impl Stream<Item = StorageResult<O>> + Send;
161
162 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
163 self,
164 f: F,
165 ) -> Self::ItemStream<O, F> {
166 into_stream_inner(self, f)
167 }
168}
169
170pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
171pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
172pub trait StateStoreReadIter = StateStoreIter + 'static;
173
174#[derive(Clone, Copy, Eq, PartialEq, Debug)]
175pub enum ChangeLogValue<T> {
176 Insert(T),
177 Update { new_value: T, old_value: T },
178 Delete(T),
179}
180
181impl<T> ChangeLogValue<T> {
182 pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
183 Ok(match self {
184 ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
185 ChangeLogValue::Update {
186 new_value,
187 old_value,
188 } => ChangeLogValue::Update {
189 new_value: f(new_value)?,
190 old_value: f(old_value)?,
191 },
192 ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
193 })
194 }
195
196 pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
197 std::iter::from_coroutine(
198 #[coroutine]
199 move || match self {
200 Self::Insert(row) => {
201 yield (Op::Insert, row);
202 }
203 Self::Delete(row) => {
204 yield (Op::Delete, row);
205 }
206 Self::Update {
207 old_value,
208 new_value,
209 } => {
210 yield (Op::UpdateDelete, old_value);
211 yield (Op::UpdateInsert, new_value);
212 }
213 },
214 )
215 }
216}
217
218impl<T: AsRef<[u8]>> ChangeLogValue<T> {
219 pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
220 match self {
221 ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
222 ChangeLogValue::Update {
223 new_value,
224 old_value,
225 } => ChangeLogValue::Update {
226 new_value: new_value.as_ref(),
227 old_value: old_value.as_ref(),
228 },
229 ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
230 }
231 }
232}
233
234pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
235pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
236
237#[derive(Clone)]
238pub struct NextEpochOptions {
239 pub table_id: TableId,
240}
241
242#[derive(Clone)]
243pub struct ReadLogOptions {
244 pub table_id: TableId,
245}
246
247pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
248pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
249
250pub trait StateStoreReadLog: StaticSendSync {
251 type ChangeLogIter: StateStoreReadChangeLogIter;
252
253 fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
254
255 fn iter_log(
256 &self,
257 epoch_range: (u64, u64),
258 key_range: TableKeyRange,
259 options: ReadLogOptions,
260 ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
261}
262
263pub trait KeyValueFn<'a, O> =
264 for<'kv> FnOnce(FullKey<&'kv [u8]>, &'kv [u8]) -> StorageResult<O> + Send + 'a;
265
266pub trait StateStoreGet: StaticSendSync {
267 fn on_key_value<'a, O: Send + 'a>(
268 &'a self,
269 key: TableKey<Bytes>,
270 read_options: ReadOptions,
271 on_key_value_fn: impl KeyValueFn<'a, O>,
272 ) -> impl StorageFuture<'a, Option<O>>;
273}
274
275pub trait StateStoreRead: StateStoreGet + StaticSendSync {
276 type Iter: StateStoreReadIter;
277 type RevIter: StateStoreReadIter;
278
279 fn iter(
285 &self,
286 key_range: TableKeyRange,
287 read_options: ReadOptions,
288 ) -> impl StorageFuture<'_, Self::Iter>;
289
290 fn rev_iter(
291 &self,
292 key_range: TableKeyRange,
293 read_options: ReadOptions,
294 ) -> impl StorageFuture<'_, Self::RevIter>;
295}
296
297#[derive(Clone)]
298pub struct TryWaitEpochOptions {
299 pub table_id: TableId,
300}
301
302impl TryWaitEpochOptions {
303 #[cfg(any(test, feature = "test"))]
304 pub fn for_test(table_id: TableId) -> Self {
305 Self { table_id }
306 }
307}
308
309impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
310 fn from(value: TracedTryWaitEpochOptions) -> Self {
311 Self {
312 table_id: value.table_id.into(),
313 }
314 }
315}
316
317impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
318 fn from(value: TryWaitEpochOptions) -> Self {
319 Self {
320 table_id: value.table_id.into(),
321 }
322 }
323}
324
325#[derive(Clone, Copy)]
326pub struct NewReadSnapshotOptions {
327 pub table_id: TableId,
328 pub table_option: TableOption,
329}
330
331#[derive(Clone)]
332pub struct NewVectorWriterOptions {
333 pub table_id: TableId,
334}
335
336pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
337 type Local: LocalStateStore;
338 type ReadSnapshot: StateStoreRead + StateStoreReadVector + Clone;
339 type VectorWriter: StateStoreWriteVector;
340
341 fn try_wait_epoch(
344 &self,
345 epoch: HummockReadEpoch,
346 options: TryWaitEpochOptions,
347 ) -> impl StorageFuture<'_, ()>;
348
349 fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
351 MonitoredStateStore::new(self, storage_metrics)
352 }
353
354 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
355
356 fn new_read_snapshot(
357 &self,
358 epoch: HummockReadEpoch,
359 options: NewReadSnapshotOptions,
360 ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
361
362 fn new_vector_writer(
363 &self,
364 options: NewVectorWriterOptions,
365 ) -> impl Future<Output = Self::VectorWriter> + Send + '_;
366}
367
368pub trait LocalStateStore: StateStoreGet + StateStoreWriteEpochControl + StaticSendSync {
372 type FlushedSnapshotReader: StateStoreRead;
373 type Iter<'a>: StateStoreIter + 'a;
374 type RevIter<'a>: StateStoreIter + 'a;
375
376 fn iter(
382 &self,
383 key_range: TableKeyRange,
384 read_options: ReadOptions,
385 ) -> impl StorageFuture<'_, Self::Iter<'_>>;
386
387 fn rev_iter(
388 &self,
389 key_range: TableKeyRange,
390 read_options: ReadOptions,
391 ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
392
393 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
394
395 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
397
398 fn insert(
400 &mut self,
401 key: TableKey<Bytes>,
402 new_val: Bytes,
403 old_val: Option<Bytes>,
404 ) -> StorageResult<()>;
405
406 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
409
410 fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
413}
414
415pub trait StateStoreWriteEpochControl: StaticSendSync {
416 fn flush(&mut self) -> impl StorageFuture<'_, usize>;
417
418 fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
419
420 fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
427
428 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
432}
433
434pub trait StateStoreWriteVector: StateStoreWriteEpochControl + StaticSendSync {
435 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
436}
437
438pub struct VectorNearestOptions {
439 pub top_n: usize,
440 pub measure: DistanceMeasurement,
441 pub hnsw_ef_search: usize,
442}
443
444pub trait OnNearestItemFn<'a, O> = OnNearestItem<O> + Send + Sync + 'a;
445
446pub trait StateStoreReadVector: StaticSendSync {
447 fn nearest<'a, O: Send + 'a>(
448 &'a self,
449 vec: VectorRef<'a>,
450 options: VectorNearestOptions,
451 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
452 ) -> impl StorageFuture<'a, Vec<O>>;
453}
454
455#[derive(Default, Clone, Copy)]
460pub struct PrefetchOptions {
461 pub prefetch: bool,
462 pub for_large_query: bool,
463}
464
465impl PrefetchOptions {
466 pub fn prefetch_for_large_range_scan() -> Self {
467 Self {
468 prefetch: true,
469 for_large_query: true,
470 }
471 }
472
473 pub fn prefetch_for_small_range_scan() -> Self {
474 Self {
475 prefetch: true,
476 for_large_query: false,
477 }
478 }
479
480 pub fn new(prefetch: bool, for_large_query: bool) -> Self {
481 Self {
482 prefetch,
483 for_large_query,
484 }
485 }
486}
487
488impl From<TracedPrefetchOptions> for PrefetchOptions {
489 fn from(value: TracedPrefetchOptions) -> Self {
490 Self {
491 prefetch: value.prefetch,
492 for_large_query: value.for_large_query,
493 }
494 }
495}
496
497impl From<PrefetchOptions> for TracedPrefetchOptions {
498 fn from(value: PrefetchOptions) -> Self {
499 Self {
500 prefetch: value.prefetch,
501 for_large_query: value.for_large_query,
502 }
503 }
504}
505
506#[derive(Default, Clone)]
507pub struct ReadOptions {
508 pub prefix_hint: Option<Bytes>,
512 pub prefetch_options: PrefetchOptions,
513 pub cache_policy: CachePolicy,
514}
515
516impl From<TracedReadOptions> for ReadOptions {
517 fn from(value: TracedReadOptions) -> Self {
518 Self {
519 prefix_hint: value.prefix_hint.map(|b| b.into()),
520 prefetch_options: value.prefetch_options.into(),
521 cache_policy: value.cache_policy.into(),
522 }
523 }
524}
525
526impl ReadOptions {
527 pub fn into_traced_read_options(
528 self,
529 table_id: TableId,
530 epoch: Option<HummockReadEpoch>,
531 table_option: TableOption,
532 ) -> TracedReadOptions {
533 let value = self;
534 let (read_version_from_backup, read_committed) = match epoch {
535 None | Some(HummockReadEpoch::NoWait(_)) => (false, false),
536 Some(HummockReadEpoch::Backup(_)) => (true, false),
537 Some(HummockReadEpoch::Committed(_))
538 | Some(HummockReadEpoch::BatchQueryCommitted(_, _))
539 | Some(HummockReadEpoch::TimeTravel(_)) => (false, true),
540 };
541 TracedReadOptions {
542 prefix_hint: value.prefix_hint.map(|b| b.into()),
543 prefetch_options: value.prefetch_options.into(),
544 cache_policy: value.cache_policy.into(),
545 retention_seconds: table_option.retention_seconds,
546 table_id: table_id.into(),
547 read_version_from_backup,
548 read_committed,
549 }
550 }
551}
552
553pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<u32>) -> u64 {
554 match retention_seconds {
555 Some(retention_seconds_u32) => {
556 if base_epoch == MAX_EPOCH {
557 panic!("generate min epoch for MAX_EPOCH");
558 }
559 Epoch(base_epoch)
560 .subtract_ms(retention_seconds_u32 as u64 * 1000)
561 .0
562 }
563 None => 0,
564 }
565}
566
567pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
568
569pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
570 LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
571
572#[derive(Default, Clone)]
573pub enum OpConsistencyLevel {
574 #[default]
575 Inconsistent,
576 ConsistentOldValue {
577 check_old_value: Arc<dyn CheckOldValueEquality>,
578 is_log_store: bool,
580 },
581}
582
583impl Debug for OpConsistencyLevel {
584 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
585 match self {
586 OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
587 OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
588 .debug_struct("OpConsistencyLevel::ConsistentOldValue")
589 .field("is_log_store", is_log_store)
590 .finish(),
591 }
592 }
593}
594
595impl PartialEq<Self> for OpConsistencyLevel {
596 fn eq(&self, other: &Self) -> bool {
597 matches!(
598 (self, other),
599 (
600 OpConsistencyLevel::Inconsistent,
601 OpConsistencyLevel::Inconsistent
602 ) | (
603 OpConsistencyLevel::ConsistentOldValue {
604 is_log_store: true,
605 ..
606 },
607 OpConsistencyLevel::ConsistentOldValue {
608 is_log_store: true,
609 ..
610 },
611 ) | (
612 OpConsistencyLevel::ConsistentOldValue {
613 is_log_store: false,
614 ..
615 },
616 OpConsistencyLevel::ConsistentOldValue {
617 is_log_store: false,
618 ..
619 },
620 )
621 )
622 }
623}
624
625impl Eq for OpConsistencyLevel {}
626
627impl OpConsistencyLevel {
628 pub fn update(&mut self, new_level: &OpConsistencyLevel) {
629 assert_ne!(self, new_level);
630 *self = new_level.clone()
631 }
632}
633
634#[derive(Clone)]
635pub struct NewLocalOptions {
636 pub table_id: TableId,
637 pub op_consistency_level: OpConsistencyLevel,
645 pub table_option: TableOption,
646
647 pub is_replicated: bool,
650
651 pub vnodes: Arc<Bitmap>,
653
654 pub upload_on_flush: bool,
655}
656
657impl From<TracedNewLocalOptions> for NewLocalOptions {
658 fn from(value: TracedNewLocalOptions) -> Self {
659 Self {
660 table_id: value.table_id.into(),
661 op_consistency_level: match value.op_consistency_level {
662 TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
663 TracedOpConsistencyLevel::ConsistentOldValue => {
664 OpConsistencyLevel::ConsistentOldValue {
665 check_old_value: CHECK_BYTES_EQUAL.clone(),
666 is_log_store: false,
668 }
669 }
670 },
671 table_option: value.table_option.into(),
672 is_replicated: value.is_replicated,
673 vnodes: Arc::new(value.vnodes.into()),
674 upload_on_flush: value.upload_on_flush,
675 }
676 }
677}
678
679impl From<NewLocalOptions> for TracedNewLocalOptions {
680 fn from(value: NewLocalOptions) -> Self {
681 Self {
682 table_id: value.table_id.into(),
683 op_consistency_level: match value.op_consistency_level {
684 OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
685 OpConsistencyLevel::ConsistentOldValue { .. } => {
686 TracedOpConsistencyLevel::ConsistentOldValue
687 }
688 },
689 table_option: value.table_option.into(),
690 is_replicated: value.is_replicated,
691 vnodes: value.vnodes.as_ref().clone().into(),
692 upload_on_flush: value.upload_on_flush,
693 }
694 }
695}
696
697impl NewLocalOptions {
698 pub fn new(
699 table_id: TableId,
700 op_consistency_level: OpConsistencyLevel,
701 table_option: TableOption,
702 vnodes: Arc<Bitmap>,
703 upload_on_flush: bool,
704 ) -> Self {
705 NewLocalOptions {
706 table_id,
707 op_consistency_level,
708 table_option,
709 is_replicated: false,
710 vnodes,
711 upload_on_flush,
712 }
713 }
714
715 pub fn new_replicated(
716 table_id: TableId,
717 op_consistency_level: OpConsistencyLevel,
718 table_option: TableOption,
719 vnodes: Arc<Bitmap>,
720 ) -> Self {
721 NewLocalOptions {
722 table_id,
723 op_consistency_level,
724 table_option,
725 is_replicated: true,
726 vnodes,
727 upload_on_flush: false,
728 }
729 }
730
731 pub fn for_test(table_id: TableId) -> Self {
732 Self {
733 table_id,
734 op_consistency_level: OpConsistencyLevel::Inconsistent,
735 table_option: TableOption {
736 retention_seconds: None,
737 },
738 is_replicated: false,
739 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
740 upload_on_flush: true,
741 }
742 }
743}
744
745#[derive(Clone)]
746pub struct InitOptions {
747 pub epoch: EpochPair,
748}
749
750impl InitOptions {
751 pub fn new(epoch: EpochPair) -> Self {
752 Self { epoch }
753 }
754}
755
756impl From<InitOptions> for TracedInitOptions {
757 fn from(value: InitOptions) -> Self {
758 TracedInitOptions {
759 epoch: value.epoch.into(),
760 }
761 }
762}
763
764impl From<TracedInitOptions> for InitOptions {
765 fn from(value: TracedInitOptions) -> Self {
766 InitOptions {
767 epoch: value.epoch.into(),
768 }
769 }
770}
771
772#[derive(Clone, Debug)]
773pub struct SealCurrentEpochOptions {
774 pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
775 pub switch_op_consistency_level: Option<OpConsistencyLevel>,
776}
777
778impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
779 fn from(value: SealCurrentEpochOptions) -> Self {
780 TracedSealCurrentEpochOptions {
781 table_watermarks: value.table_watermarks.map(
782 |(direction, watermarks, watermark_type)| {
783 (
784 direction == WatermarkDirection::Ascending,
785 watermarks
786 .into_iter()
787 .map(|watermark| {
788 let pb_watermark = PbVnodeWatermark::from(watermark);
789 Message::encode_to_vec(&pb_watermark)
790 })
791 .collect(),
792 PbWatermarkSerdeType::from(watermark_type) as i32,
793 )
794 },
795 ),
796 switch_op_consistency_level: value
797 .switch_op_consistency_level
798 .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
799 }
800 }
801}
802
803impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
804 fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
805 SealCurrentEpochOptions {
806 table_watermarks: value.table_watermarks.map(
807 |(is_ascending, watermarks, watermark_serde_type)| {
808 (
809 if is_ascending {
810 WatermarkDirection::Ascending
811 } else {
812 WatermarkDirection::Descending
813 },
814 watermarks
815 .into_iter()
816 .map(|serialized_watermark| {
817 Message::decode(serialized_watermark.as_slice())
818 .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
819 .expect("should not failed")
820 })
821 .collect(),
822 match PbWatermarkSerdeType::try_from(watermark_serde_type).unwrap() {
823 PbWatermarkSerdeType::TypeUnspecified => unreachable!(),
824 PbWatermarkSerdeType::PkPrefix => WatermarkSerdeType::PkPrefix,
825 PbWatermarkSerdeType::NonPkPrefix => WatermarkSerdeType::NonPkPrefix,
826 PbWatermarkSerdeType::Value => WatermarkSerdeType::Value,
827 },
828 )
829 },
830 ),
831 switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
832 if enable {
833 OpConsistencyLevel::ConsistentOldValue {
834 check_old_value: CHECK_BYTES_EQUAL.clone(),
835 is_log_store: false,
836 }
837 } else {
838 OpConsistencyLevel::Inconsistent
839 }
840 }),
841 }
842 }
843}
844
845impl SealCurrentEpochOptions {
846 pub fn for_test() -> Self {
847 Self {
848 table_watermarks: None,
849 switch_op_consistency_level: None,
850 }
851 }
852}