1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::fmt::Debug;
18use std::iter::once;
19use std::ops::Bound::*;
20use std::ops::{Bound, Deref, DerefMut, RangeBounds};
21use std::ptr;
22
23use bytes::{Buf, BufMut, Bytes, BytesMut};
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VirtualNode;
26use risingwave_common_estimate_size::EstimateSize;
27
28use crate::{EpochWithGap, HummockEpoch};
29
30pub const EPOCH_LEN: usize = std::mem::size_of::<HummockEpoch>();
31pub const TABLE_PREFIX_LEN: usize = std::mem::size_of::<u32>();
32pub const MAX_KEY_LEN: usize = u16::MAX as usize;
34
35pub type KeyPayloadType = Bytes;
36pub type TableKeyRange = (
37 Bound<TableKey<KeyPayloadType>>,
38 Bound<TableKey<KeyPayloadType>>,
39);
40pub type UserKeyRange = (
41 Bound<UserKey<KeyPayloadType>>,
42 Bound<UserKey<KeyPayloadType>>,
43);
44pub type UserKeyRangeRef<'a> = (Bound<UserKey<&'a [u8]>>, Bound<UserKey<&'a [u8]>>);
45pub type FullKeyRange = (
46 Bound<FullKey<KeyPayloadType>>,
47 Bound<FullKey<KeyPayloadType>>,
48);
49
50pub fn is_empty_key_range(key_range: &TableKeyRange) -> bool {
51 match key_range {
52 (Included(start), Excluded(end)) => start == end,
53 _ => false,
54 }
55}
56
57pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) {
67 let (left, right) = range;
68 let left = match left {
69 Included(key) | Excluded(key) => key.vnode_part().to_index(),
70 Unbounded => 0,
71 };
72 let right = match right {
73 Included(key) => key.vnode_part().to_index() + 1,
74 Excluded(key) => {
75 let (vnode, inner_key) = key.split_vnode();
76 if inner_key.is_empty() {
77 vnode.to_index()
80 } else {
81 vnode.to_index() + 1
82 }
83 }
84 Unbounded => VirtualNode::MAX_REPRESENTABLE.to_index() + 1,
85 };
86 (left, right)
87}
88
89pub fn vnode(range: &TableKeyRange) -> VirtualNode {
99 let (l, r_exclusive) = vnode_range(range);
100 assert_eq!(r_exclusive - l, 1);
101 VirtualNode::from_index(l)
102}
103
104pub fn key_with_epoch(mut user_key: Vec<u8>, epoch: HummockEpoch) -> Vec<u8> {
106 let res = epoch.to_be();
107 user_key.reserve(EPOCH_LEN);
108 let buf = user_key.chunk_mut();
109
110 unsafe {
112 ptr::copy_nonoverlapping(
113 &res as *const _ as *const u8,
114 buf.as_mut_ptr() as *mut _,
115 EPOCH_LEN,
116 );
117 user_key.advance_mut(EPOCH_LEN);
118 }
119
120 user_key
121}
122
123#[inline]
125pub fn split_key_epoch(full_key: &[u8]) -> (&[u8], &[u8]) {
126 let pos = full_key
127 .len()
128 .checked_sub(EPOCH_LEN)
129 .unwrap_or_else(|| panic!("bad full key format: {:?}", full_key));
130 full_key.split_at(pos)
131}
132
133pub fn user_key(full_key: &[u8]) -> &[u8] {
135 split_key_epoch(full_key).0
136}
137
138pub fn table_key(user_key: &[u8]) -> &[u8] {
140 &user_key[TABLE_PREFIX_LEN..]
141}
142
143#[inline(always)]
144pub fn get_user_key(full_key: &[u8]) -> Vec<u8> {
146 if full_key.is_empty() {
147 vec![]
148 } else {
149 user_key(full_key).to_vec()
150 }
151}
152
153#[inline(always)]
155pub fn get_table_id(full_key: &[u8]) -> u32 {
156 let mut buf = full_key;
157 buf.get_u32()
158}
159
160pub fn next_key(key: &[u8]) -> Vec<u8> {
179 if let Some((s, e)) = next_key_no_alloc(key) {
180 let mut res = Vec::with_capacity(s.len() + 1);
181 res.extend_from_slice(s);
182 res.push(e);
183 res
184 } else {
185 Vec::new()
186 }
187}
188
189pub fn prev_key(key: &[u8]) -> Vec<u8> {
206 let pos = key.iter().rposition(|b| *b != 0x00);
207 match pos {
208 Some(pos) => {
209 let mut res = Vec::with_capacity(key.len());
210 res.extend_from_slice(&key[0..pos]);
211 res.push(key[pos] - 1);
212 if pos + 1 < key.len() {
213 res.push(b"\xff".to_owned()[0]);
214 }
215 res
216 }
217 None => {
218 vec![0xff; key.len()]
219 }
220 }
221}
222
223fn next_key_no_alloc(key: &[u8]) -> Option<(&[u8], u8)> {
224 let pos = key.iter().rposition(|b| *b != 0xff)?;
225 Some((&key[..pos], key[pos] + 1))
226}
227
228pub fn next_epoch(epoch: &[u8]) -> Vec<u8> {
243 let pos = epoch.iter().rposition(|b| *b != 0xff);
244 match pos {
245 Some(mut pos) => {
246 let mut res = Vec::with_capacity(epoch.len());
247 res.extend_from_slice(&epoch[0..pos]);
248 res.push(epoch[pos] + 1);
249 while pos + 1 < epoch.len() {
250 res.push(0x00);
251 pos += 1;
252 }
253 res
254 }
255 None => {
256 vec![0x00; epoch.len()]
257 }
258 }
259}
260
261pub fn prev_epoch(epoch: &[u8]) -> Vec<u8> {
274 let pos = epoch.iter().rposition(|b| *b != 0x00);
275 match pos {
276 Some(mut pos) => {
277 let mut res = Vec::with_capacity(epoch.len());
278 res.extend_from_slice(&epoch[0..pos]);
279 res.push(epoch[pos] - 1);
280 while pos + 1 < epoch.len() {
281 res.push(0xff);
282 pos += 1;
283 }
284 res
285 }
286 None => {
287 vec![0xff; epoch.len()]
288 }
289 }
290}
291
292pub fn next_full_key(full_key: &[u8]) -> Vec<u8> {
296 let (user_key, epoch) = split_key_epoch(full_key);
297 let prev_epoch = prev_epoch(epoch);
298 let mut res = Vec::with_capacity(full_key.len());
299 if prev_epoch.cmp(&vec![0xff; prev_epoch.len()]) == Ordering::Equal {
300 let next_user_key = next_key(user_key);
301 if next_user_key.is_empty() {
302 return Vec::new();
303 }
304 res.extend_from_slice(next_user_key.as_slice());
305 res.extend_from_slice(prev_epoch.as_slice());
306 res
307 } else {
308 res.extend_from_slice(user_key);
309 res.extend_from_slice(prev_epoch.as_slice());
310 res
311 }
312}
313
314pub fn prev_full_key(full_key: &[u8]) -> Vec<u8> {
318 let (user_key, epoch) = split_key_epoch(full_key);
319 let next_epoch = next_epoch(epoch);
320 let mut res = Vec::with_capacity(full_key.len());
321 if next_epoch.cmp(&vec![0x00; next_epoch.len()]) == Ordering::Equal {
322 let prev_user_key = prev_key(user_key);
323 if prev_user_key.cmp(&vec![0xff; prev_user_key.len()]) == Ordering::Equal {
324 return Vec::new();
325 }
326 res.extend_from_slice(prev_user_key.as_slice());
327 res.extend_from_slice(next_epoch.as_slice());
328 res
329 } else {
330 res.extend_from_slice(user_key);
331 res.extend_from_slice(next_epoch.as_slice());
332 res
333 }
334}
335
336pub fn end_bound_of_vnode(vnode: VirtualNode) -> Bound<Bytes> {
344 if vnode == VirtualNode::MAX_REPRESENTABLE {
345 Unbounded
346 } else {
347 let end_bound_index = vnode.to_index() + 1;
348 Excluded(Bytes::copy_from_slice(
349 &VirtualNode::from_index(end_bound_index).to_be_bytes(),
350 ))
351 }
352}
353
354pub fn end_bound_of_prefix(prefix: &[u8]) -> Bound<Bytes> {
356 if let Some((s, e)) = next_key_no_alloc(prefix) {
357 let mut buf = BytesMut::with_capacity(s.len() + 1);
358 buf.extend_from_slice(s);
359 buf.put_u8(e);
360 Excluded(buf.freeze())
361 } else {
362 Unbounded
363 }
364}
365
366pub fn start_bound_of_excluded_prefix(prefix: &[u8]) -> Bound<Bytes> {
368 if let Some((s, e)) = next_key_no_alloc(prefix) {
369 let mut buf = BytesMut::with_capacity(s.len() + 1);
370 buf.extend_from_slice(s);
371 buf.put_u8(e);
372 Included(buf.freeze())
373 } else {
374 panic!("the prefix is the maximum value")
375 }
376}
377
378pub fn range_of_prefix(prefix: &[u8]) -> (Bound<Bytes>, Bound<Bytes>) {
380 if prefix.is_empty() {
381 (Unbounded, Unbounded)
382 } else {
383 (
384 Included(Bytes::copy_from_slice(prefix)),
385 end_bound_of_prefix(prefix),
386 )
387 }
388}
389
390pub fn prefix_slice_with_vnode(vnode: VirtualNode, slice: &[u8]) -> Bytes {
391 let prefix = vnode.to_be_bytes();
392 let mut buf = BytesMut::with_capacity(prefix.len() + slice.len());
393 buf.extend_from_slice(&prefix);
394 buf.extend_from_slice(slice);
395 buf.freeze()
396}
397
398pub fn prefixed_range_with_vnode<B: AsRef<[u8]>>(
400 range: impl RangeBounds<B>,
401 vnode: VirtualNode,
402) -> TableKeyRange {
403 let prefixed = |b: &B| -> Bytes { prefix_slice_with_vnode(vnode, b.as_ref()) };
404
405 let start: Bound<Bytes> = match range.start_bound() {
406 Included(b) => Included(prefixed(b)),
407 Excluded(b) => {
408 assert!(!b.as_ref().is_empty());
409 Excluded(prefixed(b))
410 }
411 Unbounded => Included(Bytes::copy_from_slice(&vnode.to_be_bytes())),
412 };
413
414 let end = match range.end_bound() {
415 Included(b) => Included(prefixed(b)),
416 Excluded(b) => {
417 assert!(!b.as_ref().is_empty());
418 Excluded(prefixed(b))
419 }
420 Unbounded => end_bound_of_vnode(vnode),
421 };
422
423 map_table_key_range((start, end))
424}
425
426pub trait SetSlice<S: AsRef<[u8]> + ?Sized> {
427 fn set(&mut self, value: &S);
428}
429
430impl<S: AsRef<[u8]> + ?Sized> SetSlice<S> for Vec<u8> {
431 fn set(&mut self, value: &S) {
432 self.clear();
433 self.extend_from_slice(value.as_ref());
434 }
435}
436
437impl SetSlice<Bytes> for Bytes {
438 fn set(&mut self, value: &Bytes) {
439 *self = value.clone()
440 }
441}
442
443pub trait CopyFromSlice: Send + 'static {
444 fn copy_from_slice(slice: &[u8]) -> Self;
445}
446
447impl CopyFromSlice for Vec<u8> {
448 fn copy_from_slice(slice: &[u8]) -> Self {
449 Vec::from(slice)
450 }
451}
452
453impl CopyFromSlice for Bytes {
454 fn copy_from_slice(slice: &[u8]) -> Self {
455 Bytes::copy_from_slice(slice)
456 }
457}
458
459impl CopyFromSlice for () {
460 fn copy_from_slice(_: &[u8]) -> Self {}
461}
462
463#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
469pub struct TableKey<T: AsRef<[u8]>>(pub T);
470
471impl<T: AsRef<[u8]>> Debug for TableKey<T> {
472 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473 write!(f, "TableKey {{ {} }}", hex::encode(self.0.as_ref()))
474 }
475}
476
477impl<T: AsRef<[u8]>> Deref for TableKey<T> {
478 type Target = T;
479
480 fn deref(&self) -> &Self::Target {
481 &self.0
482 }
483}
484
485impl<T: AsRef<[u8]>> DerefMut for TableKey<T> {
486 fn deref_mut(&mut self) -> &mut Self::Target {
487 &mut self.0
488 }
489}
490
491impl<T: AsRef<[u8]>> AsRef<[u8]> for TableKey<T> {
492 fn as_ref(&self) -> &[u8] {
493 self.0.as_ref()
494 }
495}
496
497impl TableKey<Bytes> {
498 pub fn split_vnode_bytes(&self) -> (VirtualNode, Bytes) {
499 debug_assert!(
500 self.0.len() >= VirtualNode::SIZE,
501 "too short table key: {:?}",
502 self.0.as_ref()
503 );
504 let (vnode, _) = self.0.split_first_chunk::<{ VirtualNode::SIZE }>().unwrap();
505 (
506 VirtualNode::from_be_bytes(*vnode),
507 self.0.slice(VirtualNode::SIZE..),
508 )
509 }
510}
511
512impl<T: AsRef<[u8]>> TableKey<T> {
513 pub fn split_vnode(&self) -> (VirtualNode, &[u8]) {
514 debug_assert!(
515 self.0.as_ref().len() >= VirtualNode::SIZE,
516 "too short table key: {:?}",
517 self.0.as_ref()
518 );
519 let (vnode, inner_key) = self
520 .0
521 .as_ref()
522 .split_first_chunk::<{ VirtualNode::SIZE }>()
523 .unwrap();
524 (VirtualNode::from_be_bytes(*vnode), inner_key)
525 }
526
527 pub fn vnode_part(&self) -> VirtualNode {
528 self.split_vnode().0
529 }
530
531 pub fn key_part(&self) -> &[u8] {
532 self.split_vnode().1
533 }
534
535 pub fn to_ref(&self) -> TableKey<&[u8]> {
536 TableKey(self.0.as_ref())
537 }
538}
539
540impl<T: AsRef<[u8]>> Borrow<[u8]> for TableKey<T> {
541 fn borrow(&self) -> &[u8] {
542 self.0.as_ref()
543 }
544}
545
546impl EstimateSize for TableKey<Bytes> {
547 fn estimated_heap_size(&self) -> usize {
548 self.0.estimated_heap_size()
549 }
550}
551
552impl TableKey<&[u8]> {
553 pub fn copy_into<T: CopyFromSlice + AsRef<[u8]>>(&self) -> TableKey<T> {
554 TableKey(T::copy_from_slice(self.as_ref()))
555 }
556}
557
558#[inline]
559pub fn map_table_key_range(range: (Bound<KeyPayloadType>, Bound<KeyPayloadType>)) -> TableKeyRange {
560 (range.0.map(TableKey), range.1.map(TableKey))
561}
562
563pub fn gen_key_from_bytes(vnode: VirtualNode, payload: &[u8]) -> TableKey<Bytes> {
564 TableKey(Bytes::from(
565 [vnode.to_be_bytes().as_slice(), payload].concat(),
566 ))
567}
568
569pub fn gen_key_from_str(vnode: VirtualNode, payload: &str) -> TableKey<Bytes> {
570 gen_key_from_bytes(vnode, payload.as_bytes())
571}
572
573#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
579pub struct UserKey<T: AsRef<[u8]>> {
580 pub table_id: TableId,
583 pub table_key: TableKey<T>,
584}
585
586impl<T: AsRef<[u8]>> Debug for UserKey<T> {
587 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588 write!(f, "UserKey {{ {}, {:?} }}", self.table_id, self.table_key)
589 }
590}
591
592impl<T: AsRef<[u8]>> UserKey<T> {
593 pub fn new(table_id: TableId, table_key: TableKey<T>) -> Self {
594 Self {
595 table_id,
596 table_key,
597 }
598 }
599
600 pub fn for_test(table_id: TableId, table_key: T) -> Self {
602 Self {
603 table_id,
604 table_key: TableKey(table_key),
605 }
606 }
607
608 pub fn encode_into(&self, buf: &mut impl BufMut) {
610 buf.put_u32(self.table_id.as_raw_id());
611 buf.put_slice(self.table_key.as_ref());
612 }
613
614 pub fn encode_table_key_into(&self, buf: &mut impl BufMut) {
615 buf.put_slice(self.table_key.as_ref());
616 }
617
618 pub fn encode(&self) -> Vec<u8> {
619 let mut ret = Vec::with_capacity(TABLE_PREFIX_LEN + self.table_key.as_ref().len());
620 self.encode_into(&mut ret);
621 ret
622 }
623
624 pub fn is_empty(&self) -> bool {
625 self.table_key.as_ref().is_empty()
626 }
627
628 pub fn encoded_len(&self) -> usize {
630 self.table_key.as_ref().len() + TABLE_PREFIX_LEN
631 }
632
633 pub fn get_vnode_id(&self) -> usize {
634 self.table_key.vnode_part().to_index()
635 }
636}
637
638impl<'a> UserKey<&'a [u8]> {
639 pub fn decode(slice: &'a [u8]) -> Self {
642 let table_id: u32 = (&slice[..]).get_u32();
643
644 Self {
645 table_id: TableId::new(table_id),
646 table_key: TableKey(&slice[TABLE_PREFIX_LEN..]),
647 }
648 }
649
650 pub fn to_vec(self) -> UserKey<Vec<u8>> {
651 self.copy_into()
652 }
653
654 pub fn copy_into<T: CopyFromSlice + AsRef<[u8]>>(self) -> UserKey<T> {
655 UserKey {
656 table_id: self.table_id,
657 table_key: TableKey(T::copy_from_slice(self.table_key.0)),
658 }
659 }
660}
661
662impl<T: AsRef<[u8]> + Clone> UserKey<&T> {
663 pub fn cloned(self) -> UserKey<T> {
664 UserKey {
665 table_id: self.table_id,
666 table_key: TableKey(self.table_key.0.clone()),
667 }
668 }
669}
670
671impl<T: AsRef<[u8]>> UserKey<T> {
672 pub fn as_ref(&self) -> UserKey<&[u8]> {
673 UserKey::new(self.table_id, TableKey(self.table_key.as_ref()))
674 }
675}
676
677impl<T: AsRef<[u8]>> UserKey<T> {
678 pub fn set<F>(&mut self, other: UserKey<F>)
681 where
682 T: SetSlice<F>,
683 F: AsRef<[u8]>,
684 {
685 self.table_id = other.table_id;
686 self.table_key.0.set(&other.table_key.0);
687 }
688}
689
690impl UserKey<Vec<u8>> {
691 pub fn into_bytes(self) -> UserKey<Bytes> {
692 UserKey {
693 table_id: self.table_id,
694 table_key: TableKey(Bytes::from(self.table_key.0)),
695 }
696 }
697}
698
699#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)]
703pub struct FullKey<T: AsRef<[u8]>> {
704 pub user_key: UserKey<T>,
705 pub epoch_with_gap: EpochWithGap,
706}
707
708impl<T: AsRef<[u8]>> Debug for FullKey<T> {
709 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
710 write!(
711 f,
712 "FullKey {{ {:?}, epoch: {}, epoch_with_gap: {}, spill_offset: {}}}",
713 self.user_key,
714 self.epoch_with_gap.pure_epoch(),
715 self.epoch_with_gap.as_u64(),
716 self.epoch_with_gap.as_u64() - self.epoch_with_gap.pure_epoch(),
717 )
718 }
719}
720
721impl<T: AsRef<[u8]>> FullKey<T> {
722 pub fn new(table_id: TableId, table_key: TableKey<T>, epoch: HummockEpoch) -> Self {
723 Self {
724 user_key: UserKey::new(table_id, table_key),
725 epoch_with_gap: EpochWithGap::new(epoch, 0),
726 }
727 }
728
729 pub fn new_with_gap_epoch(
730 table_id: TableId,
731 table_key: TableKey<T>,
732 epoch_with_gap: EpochWithGap,
733 ) -> Self {
734 Self {
735 user_key: UserKey::new(table_id, table_key),
736 epoch_with_gap,
737 }
738 }
739
740 pub fn from_user_key(user_key: UserKey<T>, epoch: HummockEpoch) -> Self {
741 Self {
742 user_key,
743 epoch_with_gap: EpochWithGap::new_from_epoch(epoch),
744 }
745 }
746
747 pub fn for_test(table_id: TableId, table_key: T, epoch: HummockEpoch) -> Self {
749 Self {
750 user_key: UserKey::for_test(table_id, table_key),
751 epoch_with_gap: EpochWithGap::new(epoch, 0),
752 }
753 }
754
755 pub fn encode_into(&self, buf: &mut impl BufMut) {
757 self.user_key.encode_into(buf);
758 buf.put_u64(self.epoch_with_gap.as_u64());
759 }
760
761 pub fn encode(&self) -> Vec<u8> {
762 let mut buf = Vec::with_capacity(
763 TABLE_PREFIX_LEN + self.user_key.table_key.as_ref().len() + EPOCH_LEN,
764 );
765 self.encode_into(&mut buf);
766 buf
767 }
768
769 pub fn encode_into_without_table_id(&self, buf: &mut impl BufMut) {
771 self.user_key.encode_table_key_into(buf);
772 buf.put_u64(self.epoch_with_gap.as_u64());
773 }
774
775 pub fn encode_reverse_epoch(&self) -> Vec<u8> {
776 let mut buf = Vec::with_capacity(
777 TABLE_PREFIX_LEN + self.user_key.table_key.as_ref().len() + EPOCH_LEN,
778 );
779 self.user_key.encode_into(&mut buf);
780 buf.put_u64(u64::MAX - self.epoch_with_gap.as_u64());
781 buf
782 }
783
784 pub fn is_empty(&self) -> bool {
785 self.user_key.is_empty()
786 }
787
788 pub fn encoded_len(&self) -> usize {
790 self.user_key.encoded_len() + EPOCH_LEN
791 }
792}
793
794impl<'a> FullKey<&'a [u8]> {
795 pub fn decode(slice: &'a [u8]) -> Self {
797 let epoch_pos = slice.len() - EPOCH_LEN;
798 let epoch = (&slice[epoch_pos..]).get_u64();
799
800 Self {
801 user_key: UserKey::decode(&slice[..epoch_pos]),
802 epoch_with_gap: EpochWithGap::from_u64(epoch),
803 }
804 }
805
806 pub fn from_slice_without_table_id(
808 table_id: TableId,
809 slice_without_table_id: &'a [u8],
810 ) -> Self {
811 let epoch_pos = slice_without_table_id.len() - EPOCH_LEN;
812 let epoch = (&slice_without_table_id[epoch_pos..]).get_u64();
813
814 Self {
815 user_key: UserKey::new(table_id, TableKey(&slice_without_table_id[..epoch_pos])),
816 epoch_with_gap: EpochWithGap::from_u64(epoch),
817 }
818 }
819
820 pub fn decode_reverse_epoch(slice: &'a [u8]) -> Self {
822 let epoch_pos = slice.len() - EPOCH_LEN;
823 let epoch = (&slice[epoch_pos..]).get_u64();
824
825 Self {
826 user_key: UserKey::decode(&slice[..epoch_pos]),
827 epoch_with_gap: EpochWithGap::from_u64(u64::MAX - epoch),
828 }
829 }
830
831 pub fn to_vec(self) -> FullKey<Vec<u8>> {
832 self.copy_into()
833 }
834
835 pub fn copy_into<T: CopyFromSlice + AsRef<[u8]>>(self) -> FullKey<T> {
836 FullKey {
837 user_key: self.user_key.copy_into(),
838 epoch_with_gap: self.epoch_with_gap,
839 }
840 }
841}
842
843impl FullKey<Vec<u8>> {
844 pub fn into_bytes(self) -> FullKey<Bytes> {
847 FullKey {
848 epoch_with_gap: self.epoch_with_gap,
849 user_key: self.user_key.into_bytes(),
850 }
851 }
852}
853
854impl<T: AsRef<[u8]>> FullKey<T> {
855 pub fn to_ref(&self) -> FullKey<&[u8]> {
856 FullKey {
857 user_key: self.user_key.as_ref(),
858 epoch_with_gap: self.epoch_with_gap,
859 }
860 }
861}
862
863impl<T: AsRef<[u8]>> FullKey<T> {
864 pub fn set<F>(&mut self, other: FullKey<F>)
867 where
868 T: SetSlice<F>,
869 F: AsRef<[u8]>,
870 {
871 self.user_key.set(other.user_key);
872 self.epoch_with_gap = other.epoch_with_gap;
873 }
874}
875
876impl<T: AsRef<[u8]> + Ord + Eq> Ord for FullKey<T> {
877 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
878 self.user_key
880 .cmp(&other.user_key)
881 .then_with(|| other.epoch_with_gap.cmp(&self.epoch_with_gap))
882 }
883}
884
885impl<T: AsRef<[u8]> + Ord + Eq> PartialOrd for FullKey<T> {
886 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
887 Some(self.cmp(other))
888 }
889}
890
891pub mod range_delete_backward_compatibility_serde_struct {
892 use bytes::{Buf, BufMut};
893 use risingwave_common::catalog::TableId;
894 use serde::{Deserialize, Serialize};
895
896 #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
897 pub struct TableKey(Vec<u8>);
898
899 #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
900 #[serde(from = "UserKeySerde", into = "UserKeySerde")]
901 pub struct UserKey {
902 pub table_id: TableId,
905 pub table_key: TableKey,
906 }
907
908 #[derive(Deserialize, Serialize)]
909 pub struct TableIdSerde {
910 table_id: u32,
911 }
912
913 #[derive(Deserialize, Serialize)]
914 struct UserKeySerde {
915 table_id: TableIdSerde,
916 table_key: TableKey,
917 }
918
919 impl From<UserKeySerde> for UserKey {
920 fn from(value: UserKeySerde) -> Self {
921 Self {
922 table_id: TableId::new(value.table_id.table_id),
923 table_key: value.table_key,
924 }
925 }
926 }
927
928 impl From<UserKey> for UserKeySerde {
929 fn from(value: UserKey) -> Self {
930 Self {
931 table_id: TableIdSerde {
932 table_id: value.table_id.as_raw_id(),
933 },
934 table_key: value.table_key,
935 }
936 }
937 }
938
939 impl UserKey {
940 pub fn decode_length_prefixed(buf: &mut &[u8]) -> Self {
941 let table_id = buf.get_u32();
942 let len = buf.get_u32() as usize;
943 let data = buf[..len].to_vec();
944 buf.advance(len);
945 UserKey {
946 table_id: TableId::new(table_id),
947 table_key: TableKey(data),
948 }
949 }
950
951 pub fn encode_length_prefixed(&self, mut buf: impl BufMut) {
952 buf.put_u32(self.table_id.as_raw_id());
953 buf.put_u32(self.table_key.0.as_slice().len() as u32);
954 buf.put_slice(self.table_key.0.as_slice());
955 }
956 }
957
958 #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
959 pub struct PointRange {
960 pub left_user_key: UserKey,
963 pub is_exclude_left_key: bool,
967 }
968}
969
970pub trait EmptySliceRef {
971 fn empty_slice_ref<'a>() -> &'a Self;
972}
973
974static EMPTY_BYTES: Bytes = Bytes::new();
975impl EmptySliceRef for Bytes {
976 fn empty_slice_ref<'a>() -> &'a Self {
977 &EMPTY_BYTES
978 }
979}
980
981static EMPTY_VEC: Vec<u8> = Vec::new();
982impl EmptySliceRef for Vec<u8> {
983 fn empty_slice_ref<'a>() -> &'a Self {
984 &EMPTY_VEC
985 }
986}
987
988const EMPTY_SLICE: &[u8] = b"";
989impl EmptySliceRef for &[u8] {
990 fn empty_slice_ref<'b>() -> &'b Self {
991 &EMPTY_SLICE
992 }
993}
994
995pub fn bound_table_key_range<T: AsRef<[u8]> + EmptySliceRef>(
997 table_id: TableId,
998 table_key_range: &impl RangeBounds<TableKey<T>>,
999) -> (Bound<UserKey<&T>>, Bound<UserKey<&T>>) {
1000 let start = match table_key_range.start_bound() {
1001 Included(b) => Included(UserKey::new(table_id, TableKey(&b.0))),
1002 Excluded(b) => Excluded(UserKey::new(table_id, TableKey(&b.0))),
1003 Unbounded => Included(UserKey::new(table_id, TableKey(T::empty_slice_ref()))),
1004 };
1005
1006 let end = match table_key_range.end_bound() {
1007 Included(b) => Included(UserKey::new(table_id, TableKey(&b.0))),
1008 Excluded(b) => Excluded(UserKey::new(table_id, TableKey(&b.0))),
1009 Unbounded => {
1010 if let Some(next_table_id) = table_id.as_raw_id().checked_add(1) {
1011 Excluded(UserKey::new(
1012 next_table_id.into(),
1013 TableKey(T::empty_slice_ref()),
1014 ))
1015 } else {
1016 Unbounded
1017 }
1018 }
1019 };
1020
1021 (start, end)
1022}
1023
1024pub struct FullKeyTracker<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool = false> {
1026 pub latest_full_key: FullKey<T>,
1027 last_observed_epoch_with_gap: EpochWithGap,
1028}
1029
1030impl<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker<T, SKIP_DEDUP> {
1031 pub fn new(init_full_key: FullKey<T>) -> Self {
1032 let epoch_with_gap = init_full_key.epoch_with_gap;
1033 Self {
1034 latest_full_key: init_full_key,
1035 last_observed_epoch_with_gap: epoch_with_gap,
1036 }
1037 }
1038
1039 pub fn observe<F>(&mut self, key: FullKey<F>) -> bool
1074 where
1075 T: SetSlice<F>,
1076 F: AsRef<[u8]>,
1077 {
1078 self.observe_multi_version(key.user_key, once(key.epoch_with_gap))
1079 }
1080
1081 pub fn observe_multi_version<F>(
1083 &mut self,
1084 user_key: UserKey<F>,
1085 mut epochs: impl Iterator<Item = EpochWithGap>,
1086 ) -> bool
1087 where
1088 T: SetSlice<F>,
1089 F: AsRef<[u8]>,
1090 {
1091 let max_epoch_with_gap = epochs.next().expect("non-empty");
1092 let min_epoch_with_gap = epochs.fold(
1093 max_epoch_with_gap,
1094 |prev_epoch_with_gap, curr_epoch_with_gap| {
1095 assert!(
1096 prev_epoch_with_gap > curr_epoch_with_gap,
1097 "epoch list not sorted. prev: {:?}, curr: {:?}, user_key: {:?}",
1098 prev_epoch_with_gap,
1099 curr_epoch_with_gap,
1100 user_key
1101 );
1102 curr_epoch_with_gap
1103 },
1104 );
1105 match self
1106 .latest_full_key
1107 .user_key
1108 .as_ref()
1109 .cmp(&user_key.as_ref())
1110 {
1111 Ordering::Less => {
1112 self.last_observed_epoch_with_gap = min_epoch_with_gap;
1116
1117 self.latest_full_key.set(FullKey {
1119 user_key,
1120 epoch_with_gap: min_epoch_with_gap,
1121 });
1122 true
1123 }
1124 Ordering::Equal => {
1125 if max_epoch_with_gap > self.last_observed_epoch_with_gap
1126 || (!SKIP_DEDUP && max_epoch_with_gap == self.last_observed_epoch_with_gap)
1127 {
1128 panic!(
1130 "key {:?} epoch {:?} >= prev epoch {:?}",
1131 user_key, max_epoch_with_gap, self.last_observed_epoch_with_gap
1132 );
1133 }
1134 self.last_observed_epoch_with_gap = min_epoch_with_gap;
1135 false
1136 }
1137 Ordering::Greater => {
1138 panic!(
1140 "key {:?} <= prev key {:?}",
1141 user_key,
1142 FullKey {
1143 user_key: self.latest_full_key.user_key.as_ref(),
1144 epoch_with_gap: self.last_observed_epoch_with_gap
1145 }
1146 );
1147 }
1148 }
1149 }
1150
1151 pub fn latest_user_key(&self) -> &UserKey<T> {
1152 &self.latest_full_key.user_key
1153 }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158 use risingwave_common::util::epoch::test_epoch;
1159
1160 use super::*;
1161
1162 #[test]
1163 fn test_encode_decode() {
1164 let epoch = test_epoch(1);
1165 let table_key = b"abc".to_vec();
1166 let key = FullKey::for_test(TableId::new(0), &table_key[..], 0);
1167 let buf = key.encode();
1168 assert_eq!(FullKey::decode(&buf), key);
1169 let key = FullKey::for_test(TableId::new(1), &table_key[..], epoch);
1170 let buf = key.encode();
1171 assert_eq!(FullKey::decode(&buf), key);
1172 let mut table_key = vec![1];
1173 let a = FullKey::for_test(TableId::new(1), table_key.clone(), epoch);
1174 table_key[0] = 2;
1175 let b = FullKey::for_test(TableId::new(1), table_key.clone(), epoch);
1176 table_key[0] = 129;
1177 let c = FullKey::for_test(TableId::new(1), table_key, epoch);
1178 assert!(a.lt(&b));
1179 assert!(b.lt(&c));
1180 }
1181
1182 #[test]
1183 fn test_key_cmp() {
1184 let epoch = test_epoch(1);
1185 let epoch2 = test_epoch(2);
1186 let key1 = FullKey::for_test(TableId::new(0), b"0".to_vec(), epoch);
1188 let key2 = FullKey::for_test(TableId::new(1), b"0".to_vec(), epoch);
1189 let key3 = FullKey::for_test(TableId::new(1), b"1".to_vec(), epoch2);
1190 let key4 = FullKey::for_test(TableId::new(1), b"1".to_vec(), epoch);
1191
1192 assert_eq!(key1.cmp(&key1), Ordering::Equal);
1193 assert_eq!(key1.cmp(&key2), Ordering::Less);
1194 assert_eq!(key2.cmp(&key3), Ordering::Less);
1195 assert_eq!(key3.cmp(&key4), Ordering::Less);
1196 }
1197
1198 #[test]
1199 fn test_prev_key() {
1200 assert_eq!(prev_key(b"123"), b"122");
1201 assert_eq!(prev_key(b"12\x00"), b"11\xff");
1202 assert_eq!(prev_key(b"\x00\x00"), b"\xff\xff");
1203 assert_eq!(prev_key(b"\x00\x01"), b"\x00\x00");
1204 assert_eq!(prev_key(b"T"), b"S");
1205 assert_eq!(prev_key(b""), b"");
1206 }
1207
1208 #[test]
1209 fn test_bound_table_key_range() {
1210 assert_eq!(
1211 bound_table_key_range(
1212 TableId::default(),
1213 &(
1214 Included(TableKey(b"a".to_vec())),
1215 Included(TableKey(b"b".to_vec()))
1216 )
1217 ),
1218 (
1219 Included(UserKey::for_test(TableId::default(), &b"a".to_vec())),
1220 Included(UserKey::for_test(TableId::default(), &b"b".to_vec()),)
1221 )
1222 );
1223 assert_eq!(
1224 bound_table_key_range(
1225 TableId::from(1),
1226 &(Included(TableKey(b"a".to_vec())), Unbounded)
1227 ),
1228 (
1229 Included(UserKey::for_test(TableId::from(1), &b"a".to_vec())),
1230 Excluded(UserKey::for_test(TableId::from(2), &b"".to_vec()),)
1231 )
1232 );
1233 assert_eq!(
1234 bound_table_key_range(
1235 TableId::from(u32::MAX),
1236 &(Included(TableKey(b"a".to_vec())), Unbounded)
1237 ),
1238 (
1239 Included(UserKey::for_test(TableId::from(u32::MAX), &b"a".to_vec())),
1240 Unbounded,
1241 )
1242 );
1243 }
1244
1245 #[test]
1246 fn test_next_full_key() {
1247 let user_key = b"aaa".to_vec();
1248 let epoch: HummockEpoch = 3;
1249 let mut full_key = key_with_epoch(user_key, epoch);
1250 full_key = next_full_key(full_key.as_slice());
1251 assert_eq!(full_key, key_with_epoch(b"aaa".to_vec(), 2));
1252 full_key = next_full_key(full_key.as_slice());
1253 assert_eq!(full_key, key_with_epoch(b"aaa".to_vec(), 1));
1254 full_key = next_full_key(full_key.as_slice());
1255 assert_eq!(full_key, key_with_epoch(b"aaa".to_vec(), 0));
1256 full_key = next_full_key(full_key.as_slice());
1257 assert_eq!(
1258 full_key,
1259 key_with_epoch("aab".as_bytes().to_vec(), HummockEpoch::MAX)
1260 );
1261 assert_eq!(
1262 next_full_key(&key_with_epoch(b"\xff".to_vec(), 0)),
1263 Vec::<u8>::new()
1264 );
1265 }
1266
1267 #[test]
1268 fn test_prev_full_key() {
1269 let user_key = b"aab";
1270 let epoch: HummockEpoch = HummockEpoch::MAX - 3;
1271 let mut full_key = key_with_epoch(user_key.to_vec(), epoch);
1272 full_key = prev_full_key(full_key.as_slice());
1273 assert_eq!(
1274 full_key,
1275 key_with_epoch(b"aab".to_vec(), HummockEpoch::MAX - 2)
1276 );
1277 full_key = prev_full_key(full_key.as_slice());
1278 assert_eq!(
1279 full_key,
1280 key_with_epoch(b"aab".to_vec(), HummockEpoch::MAX - 1)
1281 );
1282 full_key = prev_full_key(full_key.as_slice());
1283 assert_eq!(full_key, key_with_epoch(b"aab".to_vec(), HummockEpoch::MAX));
1284 full_key = prev_full_key(full_key.as_slice());
1285 assert_eq!(full_key, key_with_epoch(b"aaa".to_vec(), 0));
1286
1287 assert_eq!(
1288 prev_full_key(&key_with_epoch(b"\x00".to_vec(), HummockEpoch::MAX)),
1289 Vec::<u8>::new()
1290 );
1291 }
1292
1293 #[test]
1294 fn test_user_key_order() {
1295 let a = UserKey::new(TableId::new(1), TableKey(b"aaa".to_vec()));
1296 let b = UserKey::new(TableId::new(2), TableKey(b"aaa".to_vec()));
1297 let c = UserKey::new(TableId::new(2), TableKey(b"bbb".to_vec()));
1298 assert!(a.lt(&b));
1299 assert!(b.lt(&c));
1300 let a = a.encode();
1301 let b = b.encode();
1302 let c = c.encode();
1303 assert!(a.lt(&b));
1304 assert!(b.lt(&c));
1305 }
1306
1307 #[test]
1308 fn test_prefixed_range_with_vnode() {
1309 let concat = |vnode: usize, b: &[u8]| -> Bytes {
1310 prefix_slice_with_vnode(VirtualNode::from_index(vnode), b)
1311 };
1312 assert_eq!(
1313 prefixed_range_with_vnode(
1314 (Included(Bytes::from("1")), Included(Bytes::from("2"))),
1315 VirtualNode::from_index(233),
1316 ),
1317 (
1318 Included(TableKey(concat(233, b"1"))),
1319 Included(TableKey(concat(233, b"2")))
1320 )
1321 );
1322 assert_eq!(
1323 prefixed_range_with_vnode(
1324 (Excluded(Bytes::from("1")), Excluded(Bytes::from("2"))),
1325 VirtualNode::from_index(233),
1326 ),
1327 (
1328 Excluded(TableKey(concat(233, b"1"))),
1329 Excluded(TableKey(concat(233, b"2")))
1330 )
1331 );
1332 assert_eq!(
1333 prefixed_range_with_vnode(
1334 (Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
1335 VirtualNode::from_index(233),
1336 ),
1337 (
1338 Included(TableKey(concat(233, b""))),
1339 Excluded(TableKey(concat(234, b"")))
1340 )
1341 );
1342 let max_vnode = VirtualNode::MAX_REPRESENTABLE.to_index();
1343 assert_eq!(
1344 prefixed_range_with_vnode(
1345 (Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
1346 VirtualNode::from_index(max_vnode),
1347 ),
1348 (Included(TableKey(concat(max_vnode, b""))), Unbounded)
1349 );
1350 let second_max_vnode = max_vnode - 1;
1351 assert_eq!(
1352 prefixed_range_with_vnode(
1353 (Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
1354 VirtualNode::from_index(second_max_vnode),
1355 ),
1356 (
1357 Included(TableKey(concat(second_max_vnode, b""))),
1358 Excluded(TableKey(concat(max_vnode, b"")))
1359 )
1360 );
1361 }
1362
1363 #[test]
1364 fn test_single_vnode_range() {
1365 let left_bound = vec![
1366 Included(b"0".as_slice()),
1367 Excluded(b"0".as_slice()),
1368 Unbounded,
1369 ];
1370 let right_bound = vec![
1371 Included(b"1".as_slice()),
1372 Excluded(b"1".as_slice()),
1373 Unbounded,
1374 ];
1375 for vnode in 0..VirtualNode::MAX_COUNT {
1376 for left in &left_bound {
1377 for right in &right_bound {
1378 assert_eq!(
1379 (vnode, vnode + 1),
1380 vnode_range(&prefixed_range_with_vnode::<&[u8]>(
1381 (*left, *right),
1382 VirtualNode::from_index(vnode)
1383 ))
1384 )
1385 }
1386 }
1387 }
1388 }
1389}