1use std::default::Default;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::marker::PhantomData;
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use futures::{Stream, TryStreamExt};
23use futures_async_stream::try_stream;
24use prost::Message;
25use risingwave_common::array::{Op, VectorRef};
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::id::FragmentId;
30use risingwave_common::util::epoch::{Epoch, EpochPair, MAX_EPOCH};
31use risingwave_common::vector::distance::DistanceMeasurement;
32use risingwave_hummock_sdk::HummockReadEpoch;
33use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
34use risingwave_hummock_sdk::table_watermark::{
35 VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
36};
37use risingwave_hummock_trace::{
38 TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
39 TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
40};
41use risingwave_pb::hummock::{PbVnodeWatermark, PbWatermarkSerdeType};
42
43use crate::error::{StorageError, StorageResult};
44use crate::hummock::CachePolicy;
45use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
46pub(crate) use crate::vector::{OnNearestItem, Vector};
47
48mod auto_rebuild;
49pub use auto_rebuild::{AutoRebuildStateStoreReadIter, timeout_auto_rebuild};
50
51pub trait StaticSendSync = Send + Sync + 'static;
52
53pub trait IterItem: Send + 'static {
54 type ItemRef<'a>: Send + Copy + 'a;
55}
56
57impl IterItem for StateStoreKeyedRow {
58 type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
59}
60
61impl IterItem for StateStoreReadLogItem {
62 type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
63}
64
65pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
66 fn try_next(&mut self) -> impl StorageFuture<'_, Option<T::ItemRef<'_>>>;
67}
68
69pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
70 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
71}
72
73pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
74 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
75 + Send;
76
77 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
78 self,
79 f: F,
80 ) -> Self::ItemStream<O, F>;
81
82 fn fused(self) -> FusedStateStoreIter<Self, T> {
83 FusedStateStoreIter::new(self)
84 }
85}
86
87#[try_stream(ok = O, error = StorageError)]
88async fn into_stream_inner<
89 T: IterItem,
90 I: StateStoreIter<T>,
91 O: Send,
92 F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
93>(
94 iter: I,
95 f: F,
96) {
97 let mut iter = iter.fused();
98 while let Some(item) = iter.try_next().await? {
99 yield f(item)?;
100 }
101}
102
103pub struct FromStreamStateStoreIter<S> {
104 inner: S,
105 item_buffer: Option<StateStoreKeyedRow>,
106}
107
108impl<S> FromStreamStateStoreIter<S> {
109 pub fn new(inner: S) -> Self {
110 Self {
111 inner,
112 item_buffer: None,
113 }
114 }
115}
116
117impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
118 for FromStreamStateStoreIter<S>
119{
120 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
121 self.item_buffer = self.inner.try_next().await?;
122 Ok(self
123 .item_buffer
124 .as_ref()
125 .map(|(key, value)| (key.to_ref(), value.as_ref())))
126 }
127}
128
129pub struct FusedStateStoreIter<I, T> {
130 inner: I,
131 finished: bool,
132 _phantom: PhantomData<T>,
133}
134
135impl<I, T> FusedStateStoreIter<I, T> {
136 fn new(inner: I) -> Self {
137 Self {
138 inner,
139 finished: false,
140 _phantom: PhantomData,
141 }
142 }
143}
144
145impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
146 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
147 assert!(!self.finished, "call try_next after finish");
148 let result = self.inner.try_next().await;
149 match &result {
150 Ok(Some(_)) => {}
151 Ok(None) | Err(_) => {
152 self.finished = true;
153 }
154 }
155 result
156 }
157}
158
159impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
160 type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
161 impl Stream<Item = StorageResult<O>> + Send;
162
163 fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
164 self,
165 f: F,
166 ) -> Self::ItemStream<O, F> {
167 into_stream_inner(self, f)
168 }
169}
170
171pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
172pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
173pub trait StateStoreReadIter = StateStoreIter + 'static;
174
175#[derive(Clone, Copy, Eq, PartialEq, Debug)]
176pub enum ChangeLogValue<T> {
177 Insert(T),
178 Update { new_value: T, old_value: T },
179 Delete(T),
180}
181
182impl<T> ChangeLogValue<T> {
183 pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
184 Ok(match self {
185 ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
186 ChangeLogValue::Update {
187 new_value,
188 old_value,
189 } => ChangeLogValue::Update {
190 new_value: f(new_value)?,
191 old_value: f(old_value)?,
192 },
193 ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
194 })
195 }
196
197 pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
198 std::iter::from_coroutine(
199 #[coroutine]
200 move || match self {
201 Self::Insert(row) => {
202 yield (Op::Insert, row);
203 }
204 Self::Delete(row) => {
205 yield (Op::Delete, row);
206 }
207 Self::Update {
208 old_value,
209 new_value,
210 } => {
211 yield (Op::UpdateDelete, old_value);
212 yield (Op::UpdateInsert, new_value);
213 }
214 },
215 )
216 }
217}
218
219impl<T: AsRef<[u8]>> ChangeLogValue<T> {
220 pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
221 match self {
222 ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
223 ChangeLogValue::Update {
224 new_value,
225 old_value,
226 } => ChangeLogValue::Update {
227 new_value: new_value.as_ref(),
228 old_value: old_value.as_ref(),
229 },
230 ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
231 }
232 }
233}
234
235pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
236pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
237
238#[derive(Clone)]
239pub struct NextEpochOptions {
240 pub table_id: TableId,
241}
242
243#[derive(Clone)]
244pub struct ReadLogOptions {
245 pub table_id: TableId,
246}
247
248pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
249pub trait StorageFuture<'a, T> = Future<Output = StorageResult<T>> + Send + 'a;
250
251pub trait StateStoreReadLog: StaticSendSync {
252 type ChangeLogIter: StateStoreReadChangeLogIter;
253
254 fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> impl StorageFuture<'_, u64>;
255
256 fn iter_log(
257 &self,
258 epoch_range: (u64, u64),
259 key_range: TableKeyRange,
260 options: ReadLogOptions,
261 ) -> impl StorageFuture<'_, Self::ChangeLogIter>;
262}
263
264pub trait KeyValueFn<'a, O> =
265 for<'kv> FnOnce(FullKey<&'kv [u8]>, &'kv [u8]) -> StorageResult<O> + Send + 'a;
266
267pub trait StateStoreGet: StaticSendSync {
268 fn on_key_value<'a, O: Send + 'a>(
269 &'a self,
270 key: TableKey<Bytes>,
271 read_options: ReadOptions,
272 on_key_value_fn: impl KeyValueFn<'a, O>,
273 ) -> impl StorageFuture<'a, Option<O>>;
274}
275
276pub trait StateStoreRead: StateStoreGet + StaticSendSync {
277 type Iter: StateStoreReadIter;
278 type RevIter: StateStoreReadIter;
279
280 fn iter(
286 &self,
287 key_range: TableKeyRange,
288 read_options: ReadOptions,
289 ) -> impl StorageFuture<'_, Self::Iter>;
290
291 fn rev_iter(
292 &self,
293 key_range: TableKeyRange,
294 read_options: ReadOptions,
295 ) -> impl StorageFuture<'_, Self::RevIter>;
296}
297
298#[derive(Clone)]
299pub struct TryWaitEpochOptions {
300 pub table_id: TableId,
301}
302
303impl TryWaitEpochOptions {
304 #[cfg(any(test, feature = "test"))]
305 pub fn for_test(table_id: TableId) -> Self {
306 Self { table_id }
307 }
308}
309
310impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
311 fn from(value: TracedTryWaitEpochOptions) -> Self {
312 Self {
313 table_id: value.table_id.into(),
314 }
315 }
316}
317
318impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
319 fn from(value: TryWaitEpochOptions) -> Self {
320 Self {
321 table_id: value.table_id.into(),
322 }
323 }
324}
325
326#[derive(Clone, Copy)]
327pub struct NewReadSnapshotOptions {
328 pub table_id: TableId,
329 pub table_option: TableOption,
330}
331
332#[derive(Clone)]
333pub struct NewVectorWriterOptions {
334 pub table_id: TableId,
335}
336
337pub trait StateStore: StateStoreReadLog + StaticSendSync + Clone {
338 type Local: LocalStateStore;
339 type ReadSnapshot: StateStoreRead + StateStoreReadVector + Clone;
340 type VectorWriter: StateStoreWriteVector;
341
342 fn try_wait_epoch(
345 &self,
346 epoch: HummockReadEpoch,
347 options: TryWaitEpochOptions,
348 ) -> impl StorageFuture<'_, ()>;
349
350 fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
352 MonitoredStateStore::new(self, storage_metrics)
353 }
354
355 fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
356
357 fn new_read_snapshot(
358 &self,
359 epoch: HummockReadEpoch,
360 options: NewReadSnapshotOptions,
361 ) -> impl StorageFuture<'_, Self::ReadSnapshot>;
362
363 fn new_vector_writer(
364 &self,
365 options: NewVectorWriterOptions,
366 ) -> impl Future<Output = Self::VectorWriter> + Send + '_;
367}
368
369pub trait LocalStateStore: StateStoreGet + StateStoreWriteEpochControl + StaticSendSync {
373 type FlushedSnapshotReader: StateStoreRead;
374 type Iter<'a>: StateStoreIter + 'a;
375 type RevIter<'a>: StateStoreIter + 'a;
376
377 fn iter(
383 &self,
384 key_range: TableKeyRange,
385 read_options: ReadOptions,
386 ) -> impl StorageFuture<'_, Self::Iter<'_>>;
387
388 fn rev_iter(
389 &self,
390 key_range: TableKeyRange,
391 read_options: ReadOptions,
392 ) -> impl StorageFuture<'_, Self::RevIter<'_>>;
393
394 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader;
395
396 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
398
399 fn insert(
401 &mut self,
402 key: TableKey<Bytes>,
403 new_val: Bytes,
404 old_val: Option<Bytes>,
405 ) -> StorageResult<()>;
406
407 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
410
411 fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> impl StorageFuture<'_, Arc<Bitmap>>;
414}
415
416pub trait StateStoreWriteEpochControl: StaticSendSync {
417 fn flush(&mut self) -> impl StorageFuture<'_, usize>;
418
419 fn try_flush(&mut self) -> impl StorageFuture<'_, ()>;
420
421 fn init(&mut self, opts: InitOptions) -> impl StorageFuture<'_, ()>;
428
429 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
433}
434
435pub trait StateStoreWriteVector: StateStoreWriteEpochControl + StaticSendSync {
436 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
437}
438
439pub struct VectorNearestOptions {
440 pub top_n: usize,
441 pub measure: DistanceMeasurement,
442 pub hnsw_ef_search: usize,
443}
444
445pub trait OnNearestItemFn<'a, O> = OnNearestItem<O> + Send + Sync + 'a;
446
447pub trait StateStoreReadVector: StaticSendSync {
448 fn nearest<'a, O: Send + 'a>(
449 &'a self,
450 vec: VectorRef<'a>,
451 options: VectorNearestOptions,
452 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
453 ) -> impl StorageFuture<'a, Vec<O>>;
454}
455
456#[derive(Default, Clone, Copy)]
461pub struct PrefetchOptions {
462 pub prefetch: bool,
463 pub for_large_query: bool,
464}
465
466impl PrefetchOptions {
467 pub fn prefetch_for_large_range_scan() -> Self {
468 Self {
469 prefetch: true,
470 for_large_query: true,
471 }
472 }
473
474 pub fn prefetch_for_small_range_scan() -> Self {
475 Self {
476 prefetch: true,
477 for_large_query: false,
478 }
479 }
480
481 pub fn new(prefetch: bool, for_large_query: bool) -> Self {
482 Self {
483 prefetch,
484 for_large_query,
485 }
486 }
487}
488
489impl From<TracedPrefetchOptions> for PrefetchOptions {
490 fn from(value: TracedPrefetchOptions) -> Self {
491 Self {
492 prefetch: value.prefetch,
493 for_large_query: value.for_large_query,
494 }
495 }
496}
497
498impl From<PrefetchOptions> for TracedPrefetchOptions {
499 fn from(value: PrefetchOptions) -> Self {
500 Self {
501 prefetch: value.prefetch,
502 for_large_query: value.for_large_query,
503 }
504 }
505}
506
507#[derive(Default, Clone)]
508pub struct ReadOptions {
509 pub prefix_hint: Option<Bytes>,
513 pub prefetch_options: PrefetchOptions,
514 pub cache_policy: CachePolicy,
515}
516
517impl From<TracedReadOptions> for ReadOptions {
518 fn from(value: TracedReadOptions) -> Self {
519 Self {
520 prefix_hint: value.prefix_hint.map(|b| b.into()),
521 prefetch_options: value.prefetch_options.into(),
522 cache_policy: value.cache_policy.into(),
523 }
524 }
525}
526
527impl ReadOptions {
528 pub fn into_traced_read_options(
529 self,
530 table_id: TableId,
531 epoch: Option<HummockReadEpoch>,
532 table_option: TableOption,
533 ) -> TracedReadOptions {
534 let value = self;
535 let (read_version_from_backup, read_committed) = match epoch {
536 None | Some(HummockReadEpoch::NoWait(_)) => (false, false),
537 Some(HummockReadEpoch::Backup(_)) => (true, false),
538 Some(HummockReadEpoch::Committed(_))
539 | Some(HummockReadEpoch::BatchQueryCommitted(_, _))
540 | Some(HummockReadEpoch::TimeTravel(_)) => (false, true),
541 };
542 TracedReadOptions {
543 prefix_hint: value.prefix_hint.map(|b| b.into()),
544 prefetch_options: value.prefetch_options.into(),
545 cache_policy: value.cache_policy.into(),
546 retention_seconds: table_option.retention_seconds,
547 table_id: table_id.into(),
548 read_version_from_backup,
549 read_committed,
550 }
551 }
552}
553
554pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<u32>) -> u64 {
555 match retention_seconds {
556 Some(retention_seconds_u32) => {
557 if base_epoch == MAX_EPOCH {
558 panic!("generate min epoch for MAX_EPOCH");
559 }
560 Epoch(base_epoch)
561 .subtract_ms(retention_seconds_u32 as u64 * 1000)
562 .0
563 }
564 None => 0,
565 }
566}
567
568pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
569
570pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
571 LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
572
573#[derive(Default, Clone)]
574pub enum OpConsistencyLevel {
575 #[default]
576 Inconsistent,
577 ConsistentOldValue {
578 check_old_value: Arc<dyn CheckOldValueEquality>,
579 is_log_store: bool,
581 },
582}
583
584impl Debug for OpConsistencyLevel {
585 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
586 match self {
587 OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
588 OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
589 .debug_struct("OpConsistencyLevel::ConsistentOldValue")
590 .field("is_log_store", is_log_store)
591 .finish(),
592 }
593 }
594}
595
596impl PartialEq<Self> for OpConsistencyLevel {
597 fn eq(&self, other: &Self) -> bool {
598 matches!(
599 (self, other),
600 (
601 OpConsistencyLevel::Inconsistent,
602 OpConsistencyLevel::Inconsistent
603 ) | (
604 OpConsistencyLevel::ConsistentOldValue {
605 is_log_store: true,
606 ..
607 },
608 OpConsistencyLevel::ConsistentOldValue {
609 is_log_store: true,
610 ..
611 },
612 ) | (
613 OpConsistencyLevel::ConsistentOldValue {
614 is_log_store: false,
615 ..
616 },
617 OpConsistencyLevel::ConsistentOldValue {
618 is_log_store: false,
619 ..
620 },
621 )
622 )
623 }
624}
625
626impl Eq for OpConsistencyLevel {}
627
628impl OpConsistencyLevel {
629 pub fn update(&mut self, new_level: &OpConsistencyLevel) {
630 assert_ne!(self, new_level);
631 *self = new_level.clone()
632 }
633}
634
635#[derive(Clone)]
636pub struct NewLocalOptions {
637 pub table_id: TableId,
638 pub fragment_id: FragmentId,
639 pub op_consistency_level: OpConsistencyLevel,
647 pub table_option: TableOption,
648
649 pub is_replicated: bool,
652
653 pub vnodes: Arc<Bitmap>,
655
656 pub upload_on_flush: bool,
657}
658
659impl From<TracedNewLocalOptions> for NewLocalOptions {
660 fn from(value: TracedNewLocalOptions) -> Self {
661 Self {
662 table_id: value.table_id.into(),
663 fragment_id: value.fragment_id.into(),
664 op_consistency_level: match value.op_consistency_level {
665 TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
666 TracedOpConsistencyLevel::ConsistentOldValue => {
667 OpConsistencyLevel::ConsistentOldValue {
668 check_old_value: CHECK_BYTES_EQUAL.clone(),
669 is_log_store: false,
671 }
672 }
673 },
674 table_option: value.table_option.into(),
675 is_replicated: value.is_replicated,
676 vnodes: Arc::new(value.vnodes.into()),
677 upload_on_flush: value.upload_on_flush,
678 }
679 }
680}
681
682impl From<NewLocalOptions> for TracedNewLocalOptions {
683 fn from(value: NewLocalOptions) -> Self {
684 Self {
685 table_id: value.table_id.into(),
686 fragment_id: value.fragment_id.into(),
687 op_consistency_level: match value.op_consistency_level {
688 OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
689 OpConsistencyLevel::ConsistentOldValue { .. } => {
690 TracedOpConsistencyLevel::ConsistentOldValue
691 }
692 },
693 table_option: value.table_option.into(),
694 is_replicated: value.is_replicated,
695 vnodes: value.vnodes.as_ref().clone().into(),
696 upload_on_flush: value.upload_on_flush,
697 }
698 }
699}
700
701impl NewLocalOptions {
702 pub fn new(
703 table_id: TableId,
704 fragment_id: FragmentId,
705 op_consistency_level: OpConsistencyLevel,
706 table_option: TableOption,
707 vnodes: Arc<Bitmap>,
708 upload_on_flush: bool,
709 ) -> Self {
710 NewLocalOptions {
711 table_id,
712 fragment_id,
713 op_consistency_level,
714 table_option,
715 is_replicated: false,
716 vnodes,
717 upload_on_flush,
718 }
719 }
720
721 pub fn new_replicated(
722 table_id: TableId,
723 fragment_id: FragmentId,
724 op_consistency_level: OpConsistencyLevel,
725 table_option: TableOption,
726 vnodes: Arc<Bitmap>,
727 ) -> Self {
728 NewLocalOptions {
729 table_id,
730 fragment_id,
731 op_consistency_level,
732 table_option,
733 is_replicated: true,
734 vnodes,
735 upload_on_flush: false,
736 }
737 }
738
739 pub fn for_test(table_id: TableId) -> Self {
740 Self {
741 table_id,
742 fragment_id: FragmentId::default(),
743 op_consistency_level: OpConsistencyLevel::Inconsistent,
744 table_option: TableOption {
745 retention_seconds: None,
746 },
747 is_replicated: false,
748 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
749 upload_on_flush: true,
750 }
751 }
752}
753
754#[derive(Clone)]
755pub struct InitOptions {
756 pub epoch: EpochPair,
757}
758
759impl InitOptions {
760 pub fn new(epoch: EpochPair) -> Self {
761 Self { epoch }
762 }
763}
764
765impl From<InitOptions> for TracedInitOptions {
766 fn from(value: InitOptions) -> Self {
767 TracedInitOptions {
768 epoch: value.epoch.into(),
769 }
770 }
771}
772
773impl From<TracedInitOptions> for InitOptions {
774 fn from(value: TracedInitOptions) -> Self {
775 InitOptions {
776 epoch: value.epoch.into(),
777 }
778 }
779}
780
781#[derive(Clone, Debug)]
782pub struct SealCurrentEpochOptions {
783 pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
784 pub switch_op_consistency_level: Option<OpConsistencyLevel>,
785}
786
787impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
788 fn from(value: SealCurrentEpochOptions) -> Self {
789 TracedSealCurrentEpochOptions {
790 table_watermarks: value.table_watermarks.map(
791 |(direction, watermarks, watermark_type)| {
792 (
793 direction == WatermarkDirection::Ascending,
794 watermarks
795 .into_iter()
796 .map(|watermark| {
797 let pb_watermark = PbVnodeWatermark::from(watermark);
798 Message::encode_to_vec(&pb_watermark)
799 })
800 .collect(),
801 PbWatermarkSerdeType::from(watermark_type) as i32,
802 )
803 },
804 ),
805 switch_op_consistency_level: value
806 .switch_op_consistency_level
807 .map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
808 }
809 }
810}
811
812impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
813 fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
814 SealCurrentEpochOptions {
815 table_watermarks: value.table_watermarks.map(
816 |(is_ascending, watermarks, watermark_serde_type)| {
817 (
818 if is_ascending {
819 WatermarkDirection::Ascending
820 } else {
821 WatermarkDirection::Descending
822 },
823 watermarks
824 .into_iter()
825 .map(|serialized_watermark| {
826 Message::decode(serialized_watermark.as_slice())
827 .map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
828 .expect("should not failed")
829 })
830 .collect(),
831 match PbWatermarkSerdeType::try_from(watermark_serde_type).unwrap() {
832 PbWatermarkSerdeType::TypeUnspecified => unreachable!(),
833 PbWatermarkSerdeType::PkPrefix => WatermarkSerdeType::PkPrefix,
834 PbWatermarkSerdeType::NonPkPrefix => WatermarkSerdeType::NonPkPrefix,
835 PbWatermarkSerdeType::Value => WatermarkSerdeType::Value,
836 },
837 )
838 },
839 ),
840 switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
841 if enable {
842 OpConsistencyLevel::ConsistentOldValue {
843 check_old_value: CHECK_BYTES_EQUAL.clone(),
844 is_log_store: false,
845 }
846 } else {
847 OpConsistencyLevel::Inconsistent
848 }
849 }),
850 }
851 }
852}
853
854impl SealCurrentEpochOptions {
855 pub fn for_test() -> Self {
856 Self {
857 table_watermarks: None,
858 switch_op_consistency_level: None,
859 }
860 }
861}