risingwave_hummock_sdk/
key.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::fmt::Debug;
18use std::iter::once;
19use std::ops::Bound::*;
20use std::ops::{Bound, Deref, DerefMut, RangeBounds};
21use std::ptr;
22
23use bytes::{Buf, BufMut, Bytes, BytesMut};
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VirtualNode;
26use risingwave_common_estimate_size::EstimateSize;
27
28use crate::{EpochWithGap, HummockEpoch};
29
30pub const EPOCH_LEN: usize = std::mem::size_of::<HummockEpoch>();
31pub const TABLE_PREFIX_LEN: usize = std::mem::size_of::<u32>();
32// Max length for key overlap and diff length. See KeyPrefix::encode.
33pub const MAX_KEY_LEN: usize = u16::MAX as usize;
34
35pub type KeyPayloadType = Bytes;
36pub type TableKeyRange = (
37    Bound<TableKey<KeyPayloadType>>,
38    Bound<TableKey<KeyPayloadType>>,
39);
40pub type UserKeyRange = (
41    Bound<UserKey<KeyPayloadType>>,
42    Bound<UserKey<KeyPayloadType>>,
43);
44pub type UserKeyRangeRef<'a> = (Bound<UserKey<&'a [u8]>>, Bound<UserKey<&'a [u8]>>);
45pub type FullKeyRange = (
46    Bound<FullKey<KeyPayloadType>>,
47    Bound<FullKey<KeyPayloadType>>,
48);
49
50pub fn is_empty_key_range(key_range: &TableKeyRange) -> bool {
51    match key_range {
52        (Included(start), Excluded(end)) => start == end,
53        _ => false,
54    }
55}
56
57/// Returns left inclusive and right exclusive vnode index of the given range.
58///
59/// # Vnode count unawareness
60///
61/// Note that this function is not aware of the vnode count that is actually used in this table.
62/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255,
63/// but this function will still return `Excluded(256)`.
64///
65/// See also [`vnode`] and [`end_bound_of_vnode`] which hold such invariant.
66pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) {
67    let (left, right) = range;
68    let left = match left {
69        Included(key) | Excluded(key) => key.vnode_part().to_index(),
70        Unbounded => 0,
71    };
72    let right = match right {
73        Included(key) => key.vnode_part().to_index() + 1,
74        Excluded(key) => {
75            let (vnode, inner_key) = key.split_vnode();
76            if inner_key.is_empty() {
77                // When the exclusive end key range contains only a vnode,
78                // the whole vnode is excluded.
79                vnode.to_index()
80            } else {
81                vnode.to_index() + 1
82            }
83        }
84        Unbounded => VirtualNode::MAX_REPRESENTABLE.to_index() + 1,
85    };
86    (left, right)
87}
88
89/// Ensure there is only one vnode involved in table key range and return the vnode.
90///
91/// # Vnode count unawareness
92///
93/// Note that this function is not aware of the vnode count that is actually used in this table.
94/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255,
95/// but this function will still require `Excluded(256)`.
96///
97/// See also [`vnode_range`] and [`end_bound_of_vnode`] which hold such invariant.
98pub fn vnode(range: &TableKeyRange) -> VirtualNode {
99    let (l, r_exclusive) = vnode_range(range);
100    assert_eq!(r_exclusive - l, 1);
101    VirtualNode::from_index(l)
102}
103
104/// Converts user key to full key by appending `epoch` to the user key.
105pub fn key_with_epoch(mut user_key: Vec<u8>, epoch: HummockEpoch) -> Vec<u8> {
106    let res = epoch.to_be();
107    user_key.reserve(EPOCH_LEN);
108    let buf = user_key.chunk_mut();
109
110    // TODO: check whether this hack improves performance
111    unsafe {
112        ptr::copy_nonoverlapping(
113            &res as *const _ as *const u8,
114            buf.as_mut_ptr() as *mut _,
115            EPOCH_LEN,
116        );
117        user_key.advance_mut(EPOCH_LEN);
118    }
119
120    user_key
121}
122
123/// Splits a full key into its user key part and epoch part.
124#[inline]
125pub fn split_key_epoch(full_key: &[u8]) -> (&[u8], &[u8]) {
126    let pos = full_key
127        .len()
128        .checked_sub(EPOCH_LEN)
129        .unwrap_or_else(|| panic!("bad full key format: {:?}", full_key));
130    full_key.split_at(pos)
131}
132
133/// Extract encoded [`UserKey`] from encoded [`FullKey`] without epoch part
134pub fn user_key(full_key: &[u8]) -> &[u8] {
135    split_key_epoch(full_key).0
136}
137
138/// Extract table key from encoded [`UserKey`] without table id part
139pub fn table_key(user_key: &[u8]) -> &[u8] {
140    &user_key[TABLE_PREFIX_LEN..]
141}
142
143#[inline(always)]
144/// Extract encoded [`UserKey`] from encoded [`FullKey`] but allow empty slice
145pub fn get_user_key(full_key: &[u8]) -> Vec<u8> {
146    if full_key.is_empty() {
147        vec![]
148    } else {
149        user_key(full_key).to_vec()
150    }
151}
152
153/// Extract table id from encoded [`FullKey`]
154#[inline(always)]
155pub fn get_table_id(full_key: &[u8]) -> u32 {
156    let mut buf = full_key;
157    buf.get_u32()
158}
159
160// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.
161
162/// Computes the next key of the given key.
163///
164/// If the key has no successor key (e.g. the input is "\xff\xff"), the result
165/// would be an empty vector.
166///
167/// # Examples
168///
169/// ```rust
170/// use risingwave_hummock_sdk::key::next_key;
171/// assert_eq!(next_key(b"123"), b"124");
172/// assert_eq!(next_key(b"12\xff"), b"13");
173/// assert_eq!(next_key(b"\xff\xff"), b"");
174/// assert_eq!(next_key(b"\xff\xfe"), b"\xff\xff");
175/// assert_eq!(next_key(b"T"), b"U");
176/// assert_eq!(next_key(b""), b"");
177/// ```
178pub fn next_key(key: &[u8]) -> Vec<u8> {
179    if let Some((s, e)) = next_key_no_alloc(key) {
180        let mut res = Vec::with_capacity(s.len() + 1);
181        res.extend_from_slice(s);
182        res.push(e);
183        res
184    } else {
185        Vec::new()
186    }
187}
188
189/// Computes the previous key of the given key.
190///
191/// If the key has no predecessor key (e.g. the input is "\x00\x00"), the result
192/// would be a "\xff\xff" vector.
193///
194/// # Examples
195///
196/// ```rust
197/// use risingwave_hummock_sdk::key::prev_key;
198/// assert_eq!(prev_key(b"123"), b"122");
199/// assert_eq!(prev_key(b"12\x00"), b"11\xff");
200/// assert_eq!(prev_key(b"\x00\x00"), b"\xff\xff");
201/// assert_eq!(prev_key(b"\x00\x01"), b"\x00\x00");
202/// assert_eq!(prev_key(b"T"), b"S");
203/// assert_eq!(prev_key(b""), b"");
204/// ```
205pub fn prev_key(key: &[u8]) -> Vec<u8> {
206    let pos = key.iter().rposition(|b| *b != 0x00);
207    match pos {
208        Some(pos) => {
209            let mut res = Vec::with_capacity(key.len());
210            res.extend_from_slice(&key[0..pos]);
211            res.push(key[pos] - 1);
212            if pos + 1 < key.len() {
213                res.push(b"\xff".to_owned()[0]);
214            }
215            res
216        }
217        None => {
218            vec![0xff; key.len()]
219        }
220    }
221}
222
223fn next_key_no_alloc(key: &[u8]) -> Option<(&[u8], u8)> {
224    let pos = key.iter().rposition(|b| *b != 0xff)?;
225    Some((&key[..pos], key[pos] + 1))
226}
227
228// End Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.
229
230/// compute the next epoch, and don't change the bytes of the u8 slice.
231/// # Examples
232///
233/// ```rust
234/// use risingwave_hummock_sdk::key::next_epoch;
235/// assert_eq!(next_epoch(b"123"), b"124");
236/// assert_eq!(next_epoch(b"\xff\x00\xff"), b"\xff\x01\x00");
237/// assert_eq!(next_epoch(b"\xff\xff"), b"\x00\x00");
238/// assert_eq!(next_epoch(b"\x00\x00"), b"\x00\x01");
239/// assert_eq!(next_epoch(b"S"), b"T");
240/// assert_eq!(next_epoch(b""), b"");
241/// ```
242pub fn next_epoch(epoch: &[u8]) -> Vec<u8> {
243    let pos = epoch.iter().rposition(|b| *b != 0xff);
244    match pos {
245        Some(mut pos) => {
246            let mut res = Vec::with_capacity(epoch.len());
247            res.extend_from_slice(&epoch[0..pos]);
248            res.push(epoch[pos] + 1);
249            while pos + 1 < epoch.len() {
250                res.push(0x00);
251                pos += 1;
252            }
253            res
254        }
255        None => {
256            vec![0x00; epoch.len()]
257        }
258    }
259}
260
261/// compute the prev epoch, and don't change the bytes of the u8 slice.
262/// # Examples
263///
264/// ```rust
265/// use risingwave_hummock_sdk::key::prev_epoch;
266/// assert_eq!(prev_epoch(b"124"), b"123");
267/// assert_eq!(prev_epoch(b"\xff\x01\x00"), b"\xff\x00\xff");
268/// assert_eq!(prev_epoch(b"\x00\x00"), b"\xff\xff");
269/// assert_eq!(prev_epoch(b"\x00\x01"), b"\x00\x00");
270/// assert_eq!(prev_epoch(b"T"), b"S");
271/// assert_eq!(prev_epoch(b""), b"");
272/// ```
273pub fn prev_epoch(epoch: &[u8]) -> Vec<u8> {
274    let pos = epoch.iter().rposition(|b| *b != 0x00);
275    match pos {
276        Some(mut pos) => {
277            let mut res = Vec::with_capacity(epoch.len());
278            res.extend_from_slice(&epoch[0..pos]);
279            res.push(epoch[pos] - 1);
280            while pos + 1 < epoch.len() {
281                res.push(0xff);
282                pos += 1;
283            }
284            res
285        }
286        None => {
287            vec![0xff; epoch.len()]
288        }
289    }
290}
291
292/// compute the next full key of the given full key
293///
294/// if the `user_key` has no successor key, the result will be a empty vec
295pub fn next_full_key(full_key: &[u8]) -> Vec<u8> {
296    let (user_key, epoch) = split_key_epoch(full_key);
297    let prev_epoch = prev_epoch(epoch);
298    let mut res = Vec::with_capacity(full_key.len());
299    if prev_epoch.cmp(&vec![0xff; prev_epoch.len()]) == Ordering::Equal {
300        let next_user_key = next_key(user_key);
301        if next_user_key.is_empty() {
302            return Vec::new();
303        }
304        res.extend_from_slice(next_user_key.as_slice());
305        res.extend_from_slice(prev_epoch.as_slice());
306        res
307    } else {
308        res.extend_from_slice(user_key);
309        res.extend_from_slice(prev_epoch.as_slice());
310        res
311    }
312}
313
314/// compute the prev full key of the given full key
315///
316/// if the `user_key` has no predecessor key, the result will be a empty vec
317pub fn prev_full_key(full_key: &[u8]) -> Vec<u8> {
318    let (user_key, epoch) = split_key_epoch(full_key);
319    let next_epoch = next_epoch(epoch);
320    let mut res = Vec::with_capacity(full_key.len());
321    if next_epoch.cmp(&vec![0x00; next_epoch.len()]) == Ordering::Equal {
322        let prev_user_key = prev_key(user_key);
323        if prev_user_key.cmp(&vec![0xff; prev_user_key.len()]) == Ordering::Equal {
324            return Vec::new();
325        }
326        res.extend_from_slice(prev_user_key.as_slice());
327        res.extend_from_slice(next_epoch.as_slice());
328        res
329    } else {
330        res.extend_from_slice(user_key);
331        res.extend_from_slice(next_epoch.as_slice());
332        res
333    }
334}
335
336/// [`Unbounded`] if the vnode is the maximum representable value (i.e. [`VirtualNode::MAX_REPRESENTABLE`]),
337/// otherwise [`Excluded`] the next vnode.
338///
339/// Note that this function is not aware of the vnode count that is actually used in this table.
340/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255,
341/// but this function will still return `Excluded(256)`. See also [`vnode`] and [`vnode_range`] which
342/// rely on such invariant.
343pub fn end_bound_of_vnode(vnode: VirtualNode) -> Bound<Bytes> {
344    if vnode == VirtualNode::MAX_REPRESENTABLE {
345        Unbounded
346    } else {
347        let end_bound_index = vnode.to_index() + 1;
348        Excluded(Bytes::copy_from_slice(
349            &VirtualNode::from_index(end_bound_index).to_be_bytes(),
350        ))
351    }
352}
353
354/// Get the end bound of the given `prefix` when transforming it to a key range.
355pub fn end_bound_of_prefix(prefix: &[u8]) -> Bound<Bytes> {
356    if let Some((s, e)) = next_key_no_alloc(prefix) {
357        let mut buf = BytesMut::with_capacity(s.len() + 1);
358        buf.extend_from_slice(s);
359        buf.put_u8(e);
360        Excluded(buf.freeze())
361    } else {
362        Unbounded
363    }
364}
365
366/// Get the start bound of the given `prefix` when it is excluded from the range.
367pub fn start_bound_of_excluded_prefix(prefix: &[u8]) -> Bound<Bytes> {
368    if let Some((s, e)) = next_key_no_alloc(prefix) {
369        let mut buf = BytesMut::with_capacity(s.len() + 1);
370        buf.extend_from_slice(s);
371        buf.put_u8(e);
372        Included(buf.freeze())
373    } else {
374        panic!("the prefix is the maximum value")
375    }
376}
377
378/// Transform the given `prefix` to a key range.
379pub fn range_of_prefix(prefix: &[u8]) -> (Bound<Bytes>, Bound<Bytes>) {
380    if prefix.is_empty() {
381        (Unbounded, Unbounded)
382    } else {
383        (
384            Included(Bytes::copy_from_slice(prefix)),
385            end_bound_of_prefix(prefix),
386        )
387    }
388}
389
390pub fn prefix_slice_with_vnode(vnode: VirtualNode, slice: &[u8]) -> Bytes {
391    let prefix = vnode.to_be_bytes();
392    let mut buf = BytesMut::with_capacity(prefix.len() + slice.len());
393    buf.extend_from_slice(&prefix);
394    buf.extend_from_slice(slice);
395    buf.freeze()
396}
397
398/// Prepend the `prefix` to the given key `range`.
399pub fn prefixed_range_with_vnode<B: AsRef<[u8]>>(
400    range: impl RangeBounds<B>,
401    vnode: VirtualNode,
402) -> TableKeyRange {
403    let prefixed = |b: &B| -> Bytes { prefix_slice_with_vnode(vnode, b.as_ref()) };
404
405    let start: Bound<Bytes> = match range.start_bound() {
406        Included(b) => Included(prefixed(b)),
407        Excluded(b) => {
408            assert!(!b.as_ref().is_empty());
409            Excluded(prefixed(b))
410        }
411        Unbounded => Included(Bytes::copy_from_slice(&vnode.to_be_bytes())),
412    };
413
414    let end = match range.end_bound() {
415        Included(b) => Included(prefixed(b)),
416        Excluded(b) => {
417            assert!(!b.as_ref().is_empty());
418            Excluded(prefixed(b))
419        }
420        Unbounded => end_bound_of_vnode(vnode),
421    };
422
423    map_table_key_range((start, end))
424}
425
426pub trait SetSlice<S: AsRef<[u8]> + ?Sized> {
427    fn set(&mut self, value: &S);
428}
429
430impl<S: AsRef<[u8]> + ?Sized> SetSlice<S> for Vec<u8> {
431    fn set(&mut self, value: &S) {
432        self.clear();
433        self.extend_from_slice(value.as_ref());
434    }
435}
436
437impl SetSlice<Bytes> for Bytes {
438    fn set(&mut self, value: &Bytes) {
439        *self = value.clone()
440    }
441}
442
443pub trait CopyFromSlice: Send + 'static {
444    fn copy_from_slice(slice: &[u8]) -> Self;
445}
446
447impl CopyFromSlice for Vec<u8> {
448    fn copy_from_slice(slice: &[u8]) -> Self {
449        Vec::from(slice)
450    }
451}
452
453impl CopyFromSlice for Bytes {
454    fn copy_from_slice(slice: &[u8]) -> Self {
455        Bytes::copy_from_slice(slice)
456    }
457}
458
459impl CopyFromSlice for () {
460    fn copy_from_slice(_: &[u8]) -> Self {}
461}
462
463/// [`TableKey`] is an internal concept in storage. It's a wrapper around the key directly from the
464/// user, to make the code clearer and avoid confusion with encoded [`UserKey`] and [`FullKey`].
465///
466/// Its name come from the assumption that Hummock is always accessed by a table-like structure
467/// identified by a [`TableId`].
468#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
469pub struct TableKey<T: AsRef<[u8]>>(pub T);
470
471impl<T: AsRef<[u8]>> Debug for TableKey<T> {
472    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473        write!(f, "TableKey {{ {} }}", hex::encode(self.0.as_ref()))
474    }
475}
476
477impl<T: AsRef<[u8]>> Deref for TableKey<T> {
478    type Target = T;
479
480    fn deref(&self) -> &Self::Target {
481        &self.0
482    }
483}
484
485impl<T: AsRef<[u8]>> DerefMut for TableKey<T> {
486    fn deref_mut(&mut self) -> &mut Self::Target {
487        &mut self.0
488    }
489}
490
491impl<T: AsRef<[u8]>> AsRef<[u8]> for TableKey<T> {
492    fn as_ref(&self) -> &[u8] {
493        self.0.as_ref()
494    }
495}
496
497impl TableKey<Bytes> {
498    pub fn split_vnode_bytes(&self) -> (VirtualNode, Bytes) {
499        debug_assert!(
500            self.0.len() >= VirtualNode::SIZE,
501            "too short table key: {:?}",
502            self.0.as_ref()
503        );
504        let (vnode, _) = self.0.split_first_chunk::<{ VirtualNode::SIZE }>().unwrap();
505        (
506            VirtualNode::from_be_bytes(*vnode),
507            self.0.slice(VirtualNode::SIZE..),
508        )
509    }
510}
511
512impl<T: AsRef<[u8]>> TableKey<T> {
513    pub fn split_vnode(&self) -> (VirtualNode, &[u8]) {
514        debug_assert!(
515            self.0.as_ref().len() >= VirtualNode::SIZE,
516            "too short table key: {:?}",
517            self.0.as_ref()
518        );
519        let (vnode, inner_key) = self
520            .0
521            .as_ref()
522            .split_first_chunk::<{ VirtualNode::SIZE }>()
523            .unwrap();
524        (VirtualNode::from_be_bytes(*vnode), inner_key)
525    }
526
527    pub fn vnode_part(&self) -> VirtualNode {
528        self.split_vnode().0
529    }
530
531    pub fn key_part(&self) -> &[u8] {
532        self.split_vnode().1
533    }
534
535    pub fn to_ref(&self) -> TableKey<&[u8]> {
536        TableKey(self.0.as_ref())
537    }
538}
539
540impl<T: AsRef<[u8]>> Borrow<[u8]> for TableKey<T> {
541    fn borrow(&self) -> &[u8] {
542        self.0.as_ref()
543    }
544}
545
546impl EstimateSize for TableKey<Bytes> {
547    fn estimated_heap_size(&self) -> usize {
548        self.0.estimated_heap_size()
549    }
550}
551
552impl TableKey<&[u8]> {
553    pub fn copy_into<T: CopyFromSlice + AsRef<[u8]>>(&self) -> TableKey<T> {
554        TableKey(T::copy_from_slice(self.as_ref()))
555    }
556}
557
558#[inline]
559pub fn map_table_key_range(range: (Bound<KeyPayloadType>, Bound<KeyPayloadType>)) -> TableKeyRange {
560    (range.0.map(TableKey), range.1.map(TableKey))
561}
562
563pub fn gen_key_from_bytes(vnode: VirtualNode, payload: &[u8]) -> TableKey<Bytes> {
564    TableKey(Bytes::from(
565        [vnode.to_be_bytes().as_slice(), payload].concat(),
566    ))
567}
568
569pub fn gen_key_from_str(vnode: VirtualNode, payload: &str) -> TableKey<Bytes> {
570    gen_key_from_bytes(vnode, payload.as_bytes())
571}
572
573/// [`UserKey`] is is an internal concept in storage. In the storage interface, user specifies
574/// `table_key` and `table_id` (in `ReadOptions` or `WriteOptions`) as the input. The storage
575/// will group these two values into one struct for convenient filtering.
576///
577/// The encoded format is | `table_id` | `table_key` |.
578#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
579pub struct UserKey<T: AsRef<[u8]>> {
580    // When comparing `UserKey`, we first compare `table_id`, then `table_key`. So the order of
581    // declaration matters.
582    pub table_id: TableId,
583    pub table_key: TableKey<T>,
584}
585
586impl<T: AsRef<[u8]>> Debug for UserKey<T> {
587    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588        write!(
589            f,
590            "UserKey {{ {}, {:?} }}",
591            self.table_id.as_raw_id(),
592            self.table_key
593        )
594    }
595}
596
597impl<T: AsRef<[u8]>> UserKey<T> {
598    pub fn new(table_id: TableId, table_key: TableKey<T>) -> Self {
599        Self {
600            table_id,
601            table_key,
602        }
603    }
604
605    /// Pass the inner type of `table_key` to make the code less verbose.
606    pub fn for_test(table_id: TableId, table_key: T) -> Self {
607        Self {
608            table_id,
609            table_key: TableKey(table_key),
610        }
611    }
612
613    /// Encode in to a buffer.
614    pub fn encode_into(&self, buf: &mut impl BufMut) {
615        buf.put_u32(self.table_id.as_raw_id());
616        buf.put_slice(self.table_key.as_ref());
617    }
618
619    pub fn encode_table_key_into(&self, buf: &mut impl BufMut) {
620        buf.put_slice(self.table_key.as_ref());
621    }
622
623    pub fn encode(&self) -> Vec<u8> {
624        let mut ret = Vec::with_capacity(TABLE_PREFIX_LEN + self.table_key.as_ref().len());
625        self.encode_into(&mut ret);
626        ret
627    }
628
629    pub fn is_empty(&self) -> bool {
630        self.table_key.as_ref().is_empty()
631    }
632
633    /// Get the length of the encoded format.
634    pub fn encoded_len(&self) -> usize {
635        self.table_key.as_ref().len() + TABLE_PREFIX_LEN
636    }
637
638    pub fn get_vnode_id(&self) -> usize {
639        self.table_key.vnode_part().to_index()
640    }
641}
642
643impl<'a> UserKey<&'a [u8]> {
644    /// Construct a [`UserKey`] from a byte slice. Its `table_key` will be a part of the input
645    /// `slice`.
646    pub fn decode(slice: &'a [u8]) -> Self {
647        let table_id: u32 = (&slice[..]).get_u32();
648
649        Self {
650            table_id: TableId::new(table_id),
651            table_key: TableKey(&slice[TABLE_PREFIX_LEN..]),
652        }
653    }
654
655    pub fn to_vec(self) -> UserKey<Vec<u8>> {
656        self.copy_into()
657    }
658
659    pub fn copy_into<T: CopyFromSlice + AsRef<[u8]>>(self) -> UserKey<T> {
660        UserKey {
661            table_id: self.table_id,
662            table_key: TableKey(T::copy_from_slice(self.table_key.0)),
663        }
664    }
665}
666
667impl<T: AsRef<[u8]> + Clone> UserKey<&T> {
668    pub fn cloned(self) -> UserKey<T> {
669        UserKey {
670            table_id: self.table_id,
671            table_key: TableKey(self.table_key.0.clone()),
672        }
673    }
674}
675
676impl<T: AsRef<[u8]>> UserKey<T> {
677    pub fn as_ref(&self) -> UserKey<&[u8]> {
678        UserKey::new(self.table_id, TableKey(self.table_key.as_ref()))
679    }
680}
681
682impl<T: AsRef<[u8]>> UserKey<T> {
683    /// Use this method to override an old `UserKey<Vec<u8>>` with a `UserKey<&[u8]>` to own the
684    /// table key without reallocating a new `UserKey` object.
685    pub fn set<F>(&mut self, other: UserKey<F>)
686    where
687        T: SetSlice<F>,
688        F: AsRef<[u8]>,
689    {
690        self.table_id = other.table_id;
691        self.table_key.0.set(&other.table_key.0);
692    }
693}
694
695impl UserKey<Vec<u8>> {
696    pub fn into_bytes(self) -> UserKey<Bytes> {
697        UserKey {
698            table_id: self.table_id,
699            table_key: TableKey(Bytes::from(self.table_key.0)),
700        }
701    }
702}
703
704/// [`FullKey`] is an internal concept in storage. It associates [`UserKey`] with an epoch.
705///
706/// The encoded format is | `user_key` | `epoch` |.
707#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)]
708pub struct FullKey<T: AsRef<[u8]>> {
709    pub user_key: UserKey<T>,
710    pub epoch_with_gap: EpochWithGap,
711}
712
713impl<T: AsRef<[u8]>> Debug for FullKey<T> {
714    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
715        write!(
716            f,
717            "FullKey {{ {:?}, epoch: {}, epoch_with_gap: {}, spill_offset: {}}}",
718            self.user_key,
719            self.epoch_with_gap.pure_epoch(),
720            self.epoch_with_gap.as_u64(),
721            self.epoch_with_gap.as_u64() - self.epoch_with_gap.pure_epoch(),
722        )
723    }
724}
725
726impl<T: AsRef<[u8]>> FullKey<T> {
727    pub fn new(table_id: TableId, table_key: TableKey<T>, epoch: HummockEpoch) -> Self {
728        Self {
729            user_key: UserKey::new(table_id, table_key),
730            epoch_with_gap: EpochWithGap::new(epoch, 0),
731        }
732    }
733
734    pub fn new_with_gap_epoch(
735        table_id: TableId,
736        table_key: TableKey<T>,
737        epoch_with_gap: EpochWithGap,
738    ) -> Self {
739        Self {
740            user_key: UserKey::new(table_id, table_key),
741            epoch_with_gap,
742        }
743    }
744
745    pub fn from_user_key(user_key: UserKey<T>, epoch: HummockEpoch) -> Self {
746        Self {
747            user_key,
748            epoch_with_gap: EpochWithGap::new_from_epoch(epoch),
749        }
750    }
751
752    /// Pass the inner type of `table_key` to make the code less verbose.
753    pub fn for_test(table_id: TableId, table_key: T, epoch: HummockEpoch) -> Self {
754        Self {
755            user_key: UserKey::for_test(table_id, table_key),
756            epoch_with_gap: EpochWithGap::new(epoch, 0),
757        }
758    }
759
760    /// Encode in to a buffer.
761    pub fn encode_into(&self, buf: &mut impl BufMut) {
762        self.user_key.encode_into(buf);
763        buf.put_u64(self.epoch_with_gap.as_u64());
764    }
765
766    pub fn encode(&self) -> Vec<u8> {
767        let mut buf = Vec::with_capacity(
768            TABLE_PREFIX_LEN + self.user_key.table_key.as_ref().len() + EPOCH_LEN,
769        );
770        self.encode_into(&mut buf);
771        buf
772    }
773
774    // Encode in to a buffer.
775    pub fn encode_into_without_table_id(&self, buf: &mut impl BufMut) {
776        self.user_key.encode_table_key_into(buf);
777        buf.put_u64(self.epoch_with_gap.as_u64());
778    }
779
780    pub fn encode_reverse_epoch(&self) -> Vec<u8> {
781        let mut buf = Vec::with_capacity(
782            TABLE_PREFIX_LEN + self.user_key.table_key.as_ref().len() + EPOCH_LEN,
783        );
784        self.user_key.encode_into(&mut buf);
785        buf.put_u64(u64::MAX - self.epoch_with_gap.as_u64());
786        buf
787    }
788
789    pub fn is_empty(&self) -> bool {
790        self.user_key.is_empty()
791    }
792
793    /// Get the length of the encoded format.
794    pub fn encoded_len(&self) -> usize {
795        self.user_key.encoded_len() + EPOCH_LEN
796    }
797}
798
799impl<'a> FullKey<&'a [u8]> {
800    /// Construct a [`FullKey`] from a byte slice.
801    pub fn decode(slice: &'a [u8]) -> Self {
802        let epoch_pos = slice.len() - EPOCH_LEN;
803        let epoch = (&slice[epoch_pos..]).get_u64();
804
805        Self {
806            user_key: UserKey::decode(&slice[..epoch_pos]),
807            epoch_with_gap: EpochWithGap::from_u64(epoch),
808        }
809    }
810
811    /// Construct a [`FullKey`] from a byte slice without `table_id` encoded.
812    pub fn from_slice_without_table_id(
813        table_id: TableId,
814        slice_without_table_id: &'a [u8],
815    ) -> Self {
816        let epoch_pos = slice_without_table_id.len() - EPOCH_LEN;
817        let epoch = (&slice_without_table_id[epoch_pos..]).get_u64();
818
819        Self {
820            user_key: UserKey::new(table_id, TableKey(&slice_without_table_id[..epoch_pos])),
821            epoch_with_gap: EpochWithGap::from_u64(epoch),
822        }
823    }
824
825    /// Construct a [`FullKey`] from a byte slice.
826    pub fn decode_reverse_epoch(slice: &'a [u8]) -> Self {
827        let epoch_pos = slice.len() - EPOCH_LEN;
828        let epoch = (&slice[epoch_pos..]).get_u64();
829
830        Self {
831            user_key: UserKey::decode(&slice[..epoch_pos]),
832            epoch_with_gap: EpochWithGap::from_u64(u64::MAX - epoch),
833        }
834    }
835
836    pub fn to_vec(self) -> FullKey<Vec<u8>> {
837        self.copy_into()
838    }
839
840    pub fn copy_into<T: CopyFromSlice + AsRef<[u8]>>(self) -> FullKey<T> {
841        FullKey {
842            user_key: self.user_key.copy_into(),
843            epoch_with_gap: self.epoch_with_gap,
844        }
845    }
846}
847
848impl FullKey<Vec<u8>> {
849    /// Calling this method may accidentally cause memory allocation when converting `Vec` into
850    /// `Bytes`
851    pub fn into_bytes(self) -> FullKey<Bytes> {
852        FullKey {
853            epoch_with_gap: self.epoch_with_gap,
854            user_key: self.user_key.into_bytes(),
855        }
856    }
857}
858
859impl<T: AsRef<[u8]>> FullKey<T> {
860    pub fn to_ref(&self) -> FullKey<&[u8]> {
861        FullKey {
862            user_key: self.user_key.as_ref(),
863            epoch_with_gap: self.epoch_with_gap,
864        }
865    }
866}
867
868impl<T: AsRef<[u8]>> FullKey<T> {
869    /// Use this method to override an old `FullKey<Vec<u8>>` with a `FullKey<&[u8]>` to own the
870    /// table key without reallocating a new `FullKey` object.
871    pub fn set<F>(&mut self, other: FullKey<F>)
872    where
873        T: SetSlice<F>,
874        F: AsRef<[u8]>,
875    {
876        self.user_key.set(other.user_key);
877        self.epoch_with_gap = other.epoch_with_gap;
878    }
879}
880
881impl<T: AsRef<[u8]> + Ord + Eq> Ord for FullKey<T> {
882    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
883        // When `user_key` is the same, greater epoch comes first.
884        self.user_key
885            .cmp(&other.user_key)
886            .then_with(|| other.epoch_with_gap.cmp(&self.epoch_with_gap))
887    }
888}
889
890impl<T: AsRef<[u8]> + Ord + Eq> PartialOrd for FullKey<T> {
891    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
892        Some(self.cmp(other))
893    }
894}
895
896pub mod range_delete_backward_compatibility_serde_struct {
897    use bytes::{Buf, BufMut};
898    use risingwave_common::catalog::TableId;
899    use serde::{Deserialize, Serialize};
900
901    #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
902    pub struct TableKey(Vec<u8>);
903
904    #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
905    #[serde(from = "UserKeySerde", into = "UserKeySerde")]
906    pub struct UserKey {
907        // When comparing `UserKey`, we first compare `table_id`, then `table_key`. So the order of
908        // declaration matters.
909        pub table_id: TableId,
910        pub table_key: TableKey,
911    }
912
913    #[derive(Deserialize, Serialize)]
914    pub struct TableIdSerde {
915        table_id: u32,
916    }
917
918    #[derive(Deserialize, Serialize)]
919    struct UserKeySerde {
920        table_id: TableIdSerde,
921        table_key: TableKey,
922    }
923
924    impl From<UserKeySerde> for UserKey {
925        fn from(value: UserKeySerde) -> Self {
926            Self {
927                table_id: TableId::new(value.table_id.table_id),
928                table_key: value.table_key,
929            }
930        }
931    }
932
933    impl From<UserKey> for UserKeySerde {
934        fn from(value: UserKey) -> Self {
935            Self {
936                table_id: TableIdSerde {
937                    table_id: value.table_id.as_raw_id(),
938                },
939                table_key: value.table_key,
940            }
941        }
942    }
943
944    impl UserKey {
945        pub fn decode_length_prefixed(buf: &mut &[u8]) -> Self {
946            let table_id = buf.get_u32();
947            let len = buf.get_u32() as usize;
948            let data = buf[..len].to_vec();
949            buf.advance(len);
950            UserKey {
951                table_id: TableId::new(table_id),
952                table_key: TableKey(data),
953            }
954        }
955
956        pub fn encode_length_prefixed(&self, mut buf: impl BufMut) {
957            buf.put_u32(self.table_id.as_raw_id());
958            buf.put_u32(self.table_key.0.as_slice().len() as u32);
959            buf.put_slice(self.table_key.0.as_slice());
960        }
961    }
962
963    #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
964    pub struct PointRange {
965        // When comparing `PointRange`, we first compare `left_user_key`, then
966        // `is_exclude_left_key`. Therefore the order of declaration matters.
967        pub left_user_key: UserKey,
968        /// `PointRange` represents the left user key itself if `is_exclude_left_key==false`
969        /// while represents the right δ Neighborhood of the left user key if
970        /// `is_exclude_left_key==true`.
971        pub is_exclude_left_key: bool,
972    }
973}
974
975pub trait EmptySliceRef {
976    fn empty_slice_ref<'a>() -> &'a Self;
977}
978
979static EMPTY_BYTES: Bytes = Bytes::new();
980impl EmptySliceRef for Bytes {
981    fn empty_slice_ref<'a>() -> &'a Self {
982        &EMPTY_BYTES
983    }
984}
985
986static EMPTY_VEC: Vec<u8> = Vec::new();
987impl EmptySliceRef for Vec<u8> {
988    fn empty_slice_ref<'a>() -> &'a Self {
989        &EMPTY_VEC
990    }
991}
992
993const EMPTY_SLICE: &[u8] = b"";
994impl EmptySliceRef for &[u8] {
995    fn empty_slice_ref<'b>() -> &'b Self {
996        &EMPTY_SLICE
997    }
998}
999
1000/// Bound table key range with table id to generate a new user key range.
1001pub fn bound_table_key_range<T: AsRef<[u8]> + EmptySliceRef>(
1002    table_id: TableId,
1003    table_key_range: &impl RangeBounds<TableKey<T>>,
1004) -> (Bound<UserKey<&T>>, Bound<UserKey<&T>>) {
1005    let start = match table_key_range.start_bound() {
1006        Included(b) => Included(UserKey::new(table_id, TableKey(&b.0))),
1007        Excluded(b) => Excluded(UserKey::new(table_id, TableKey(&b.0))),
1008        Unbounded => Included(UserKey::new(table_id, TableKey(T::empty_slice_ref()))),
1009    };
1010
1011    let end = match table_key_range.end_bound() {
1012        Included(b) => Included(UserKey::new(table_id, TableKey(&b.0))),
1013        Excluded(b) => Excluded(UserKey::new(table_id, TableKey(&b.0))),
1014        Unbounded => {
1015            if let Some(next_table_id) = table_id.as_raw_id().checked_add(1) {
1016                Excluded(UserKey::new(
1017                    next_table_id.into(),
1018                    TableKey(T::empty_slice_ref()),
1019                ))
1020            } else {
1021                Unbounded
1022            }
1023        }
1024    };
1025
1026    (start, end)
1027}
1028
1029/// TODO: Temporary bypass full key check. Remove this field after #15099 is resolved.
1030pub struct FullKeyTracker<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool = false> {
1031    pub latest_full_key: FullKey<T>,
1032    last_observed_epoch_with_gap: EpochWithGap,
1033}
1034
1035impl<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker<T, SKIP_DEDUP> {
1036    pub fn new(init_full_key: FullKey<T>) -> Self {
1037        let epoch_with_gap = init_full_key.epoch_with_gap;
1038        Self {
1039            latest_full_key: init_full_key,
1040            last_observed_epoch_with_gap: epoch_with_gap,
1041        }
1042    }
1043
1044    /// Check and observe a new full key during iteration
1045    ///
1046    /// # Examples:
1047    /// ```
1048    /// use bytes::Bytes;
1049    /// use risingwave_common::catalog::TableId;
1050    /// use risingwave_common::util::epoch::EPOCH_AVAILABLE_BITS;
1051    /// use risingwave_hummock_sdk::EpochWithGap;
1052    /// use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, TableKey};
1053    ///
1054    /// let table_id = TableId::new(1);
1055    /// let full_key1 = FullKey::new(table_id, TableKey(Bytes::from("c")), 5 << EPOCH_AVAILABLE_BITS);
1056    /// let mut a: FullKeyTracker<_> = FullKeyTracker::<Bytes>::new(full_key1.clone());
1057    ///
1058    /// // Panic on non-decreasing epoch observed for the same user key.
1059    /// // let full_key_with_larger_epoch = FullKey::new(table_id, TableKey(Bytes::from("c")), 6 << EPOCH_AVAILABLE_BITS);
1060    /// // a.observe(full_key_with_larger_epoch);
1061    ///
1062    /// // Panic on non-increasing user key observed.
1063    /// // let full_key_with_smaller_user_key = FullKey::new(table_id, TableKey(Bytes::from("b")), 3 << EPOCH_AVAILABLE_BITS);
1064    /// // a.observe(full_key_with_smaller_user_key);
1065    ///
1066    /// let full_key2 = FullKey::new(table_id, TableKey(Bytes::from("c")), 3 << EPOCH_AVAILABLE_BITS);
1067    /// assert_eq!(a.observe(full_key2.clone()), false);
1068    /// assert_eq!(a.latest_user_key(), &full_key2.user_key);
1069    ///
1070    /// let full_key3 = FullKey::new(table_id, TableKey(Bytes::from("f")), 4 << EPOCH_AVAILABLE_BITS);
1071    /// assert_eq!(a.observe(full_key3.clone()), true);
1072    /// assert_eq!(a.latest_user_key(), &full_key3.user_key);
1073    /// ```
1074    ///
1075    /// Return:
1076    /// - If the provided `key` contains a new user key, return true.
1077    /// - Otherwise: return false
1078    pub fn observe<F>(&mut self, key: FullKey<F>) -> bool
1079    where
1080        T: SetSlice<F>,
1081        F: AsRef<[u8]>,
1082    {
1083        self.observe_multi_version(key.user_key, once(key.epoch_with_gap))
1084    }
1085
1086    /// `epochs` comes from greater to smaller
1087    pub fn observe_multi_version<F>(
1088        &mut self,
1089        user_key: UserKey<F>,
1090        mut epochs: impl Iterator<Item = EpochWithGap>,
1091    ) -> bool
1092    where
1093        T: SetSlice<F>,
1094        F: AsRef<[u8]>,
1095    {
1096        let max_epoch_with_gap = epochs.next().expect("non-empty");
1097        let min_epoch_with_gap = epochs.fold(
1098            max_epoch_with_gap,
1099            |prev_epoch_with_gap, curr_epoch_with_gap| {
1100                assert!(
1101                    prev_epoch_with_gap > curr_epoch_with_gap,
1102                    "epoch list not sorted. prev: {:?}, curr: {:?}, user_key: {:?}",
1103                    prev_epoch_with_gap,
1104                    curr_epoch_with_gap,
1105                    user_key
1106                );
1107                curr_epoch_with_gap
1108            },
1109        );
1110        match self
1111            .latest_full_key
1112            .user_key
1113            .as_ref()
1114            .cmp(&user_key.as_ref())
1115        {
1116            Ordering::Less => {
1117                // Observe a new user key
1118
1119                // Reset epochs
1120                self.last_observed_epoch_with_gap = min_epoch_with_gap;
1121
1122                // Take the previous key and set latest key
1123                self.latest_full_key.set(FullKey {
1124                    user_key,
1125                    epoch_with_gap: min_epoch_with_gap,
1126                });
1127                true
1128            }
1129            Ordering::Equal => {
1130                if max_epoch_with_gap > self.last_observed_epoch_with_gap
1131                    || (!SKIP_DEDUP && max_epoch_with_gap == self.last_observed_epoch_with_gap)
1132                {
1133                    // Epoch from the same user key should be monotonically decreasing
1134                    panic!(
1135                        "key {:?} epoch {:?} >= prev epoch {:?}",
1136                        user_key, max_epoch_with_gap, self.last_observed_epoch_with_gap
1137                    );
1138                }
1139                self.last_observed_epoch_with_gap = min_epoch_with_gap;
1140                false
1141            }
1142            Ordering::Greater => {
1143                // User key should be monotonically increasing
1144                panic!(
1145                    "key {:?} <= prev key {:?}",
1146                    user_key,
1147                    FullKey {
1148                        user_key: self.latest_full_key.user_key.as_ref(),
1149                        epoch_with_gap: self.last_observed_epoch_with_gap
1150                    }
1151                );
1152            }
1153        }
1154    }
1155
1156    pub fn latest_user_key(&self) -> &UserKey<T> {
1157        &self.latest_full_key.user_key
1158    }
1159}
1160
1161#[cfg(test)]
1162mod tests {
1163    use risingwave_common::util::epoch::test_epoch;
1164
1165    use super::*;
1166
1167    #[test]
1168    fn test_encode_decode() {
1169        let epoch = test_epoch(1);
1170        let table_key = b"abc".to_vec();
1171        let key = FullKey::for_test(TableId::new(0), &table_key[..], 0);
1172        let buf = key.encode();
1173        assert_eq!(FullKey::decode(&buf), key);
1174        let key = FullKey::for_test(TableId::new(1), &table_key[..], epoch);
1175        let buf = key.encode();
1176        assert_eq!(FullKey::decode(&buf), key);
1177        let mut table_key = vec![1];
1178        let a = FullKey::for_test(TableId::new(1), table_key.clone(), epoch);
1179        table_key[0] = 2;
1180        let b = FullKey::for_test(TableId::new(1), table_key.clone(), epoch);
1181        table_key[0] = 129;
1182        let c = FullKey::for_test(TableId::new(1), table_key, epoch);
1183        assert!(a.lt(&b));
1184        assert!(b.lt(&c));
1185    }
1186
1187    #[test]
1188    fn test_key_cmp() {
1189        let epoch = test_epoch(1);
1190        let epoch2 = test_epoch(2);
1191        // 1 compared with 256 under little-endian encoding would return wrong result.
1192        let key1 = FullKey::for_test(TableId::new(0), b"0".to_vec(), epoch);
1193        let key2 = FullKey::for_test(TableId::new(1), b"0".to_vec(), epoch);
1194        let key3 = FullKey::for_test(TableId::new(1), b"1".to_vec(), epoch2);
1195        let key4 = FullKey::for_test(TableId::new(1), b"1".to_vec(), epoch);
1196
1197        assert_eq!(key1.cmp(&key1), Ordering::Equal);
1198        assert_eq!(key1.cmp(&key2), Ordering::Less);
1199        assert_eq!(key2.cmp(&key3), Ordering::Less);
1200        assert_eq!(key3.cmp(&key4), Ordering::Less);
1201    }
1202
1203    #[test]
1204    fn test_prev_key() {
1205        assert_eq!(prev_key(b"123"), b"122");
1206        assert_eq!(prev_key(b"12\x00"), b"11\xff");
1207        assert_eq!(prev_key(b"\x00\x00"), b"\xff\xff");
1208        assert_eq!(prev_key(b"\x00\x01"), b"\x00\x00");
1209        assert_eq!(prev_key(b"T"), b"S");
1210        assert_eq!(prev_key(b""), b"");
1211    }
1212
1213    #[test]
1214    fn test_bound_table_key_range() {
1215        assert_eq!(
1216            bound_table_key_range(
1217                TableId::default(),
1218                &(
1219                    Included(TableKey(b"a".to_vec())),
1220                    Included(TableKey(b"b".to_vec()))
1221                )
1222            ),
1223            (
1224                Included(UserKey::for_test(TableId::default(), &b"a".to_vec())),
1225                Included(UserKey::for_test(TableId::default(), &b"b".to_vec()),)
1226            )
1227        );
1228        assert_eq!(
1229            bound_table_key_range(
1230                TableId::from(1),
1231                &(Included(TableKey(b"a".to_vec())), Unbounded)
1232            ),
1233            (
1234                Included(UserKey::for_test(TableId::from(1), &b"a".to_vec())),
1235                Excluded(UserKey::for_test(TableId::from(2), &b"".to_vec()),)
1236            )
1237        );
1238        assert_eq!(
1239            bound_table_key_range(
1240                TableId::from(u32::MAX),
1241                &(Included(TableKey(b"a".to_vec())), Unbounded)
1242            ),
1243            (
1244                Included(UserKey::for_test(TableId::from(u32::MAX), &b"a".to_vec())),
1245                Unbounded,
1246            )
1247        );
1248    }
1249
1250    #[test]
1251    fn test_next_full_key() {
1252        let user_key = b"aaa".to_vec();
1253        let epoch: HummockEpoch = 3;
1254        let mut full_key = key_with_epoch(user_key, epoch);
1255        full_key = next_full_key(full_key.as_slice());
1256        assert_eq!(full_key, key_with_epoch(b"aaa".to_vec(), 2));
1257        full_key = next_full_key(full_key.as_slice());
1258        assert_eq!(full_key, key_with_epoch(b"aaa".to_vec(), 1));
1259        full_key = next_full_key(full_key.as_slice());
1260        assert_eq!(full_key, key_with_epoch(b"aaa".to_vec(), 0));
1261        full_key = next_full_key(full_key.as_slice());
1262        assert_eq!(
1263            full_key,
1264            key_with_epoch("aab".as_bytes().to_vec(), HummockEpoch::MAX)
1265        );
1266        assert_eq!(
1267            next_full_key(&key_with_epoch(b"\xff".to_vec(), 0)),
1268            Vec::<u8>::new()
1269        );
1270    }
1271
1272    #[test]
1273    fn test_prev_full_key() {
1274        let user_key = b"aab";
1275        let epoch: HummockEpoch = HummockEpoch::MAX - 3;
1276        let mut full_key = key_with_epoch(user_key.to_vec(), epoch);
1277        full_key = prev_full_key(full_key.as_slice());
1278        assert_eq!(
1279            full_key,
1280            key_with_epoch(b"aab".to_vec(), HummockEpoch::MAX - 2)
1281        );
1282        full_key = prev_full_key(full_key.as_slice());
1283        assert_eq!(
1284            full_key,
1285            key_with_epoch(b"aab".to_vec(), HummockEpoch::MAX - 1)
1286        );
1287        full_key = prev_full_key(full_key.as_slice());
1288        assert_eq!(full_key, key_with_epoch(b"aab".to_vec(), HummockEpoch::MAX));
1289        full_key = prev_full_key(full_key.as_slice());
1290        assert_eq!(full_key, key_with_epoch(b"aaa".to_vec(), 0));
1291
1292        assert_eq!(
1293            prev_full_key(&key_with_epoch(b"\x00".to_vec(), HummockEpoch::MAX)),
1294            Vec::<u8>::new()
1295        );
1296    }
1297
1298    #[test]
1299    fn test_user_key_order() {
1300        let a = UserKey::new(TableId::new(1), TableKey(b"aaa".to_vec()));
1301        let b = UserKey::new(TableId::new(2), TableKey(b"aaa".to_vec()));
1302        let c = UserKey::new(TableId::new(2), TableKey(b"bbb".to_vec()));
1303        assert!(a.lt(&b));
1304        assert!(b.lt(&c));
1305        let a = a.encode();
1306        let b = b.encode();
1307        let c = c.encode();
1308        assert!(a.lt(&b));
1309        assert!(b.lt(&c));
1310    }
1311
1312    #[test]
1313    fn test_prefixed_range_with_vnode() {
1314        let concat = |vnode: usize, b: &[u8]| -> Bytes {
1315            prefix_slice_with_vnode(VirtualNode::from_index(vnode), b)
1316        };
1317        assert_eq!(
1318            prefixed_range_with_vnode(
1319                (Included(Bytes::from("1")), Included(Bytes::from("2"))),
1320                VirtualNode::from_index(233),
1321            ),
1322            (
1323                Included(TableKey(concat(233, b"1"))),
1324                Included(TableKey(concat(233, b"2")))
1325            )
1326        );
1327        assert_eq!(
1328            prefixed_range_with_vnode(
1329                (Excluded(Bytes::from("1")), Excluded(Bytes::from("2"))),
1330                VirtualNode::from_index(233),
1331            ),
1332            (
1333                Excluded(TableKey(concat(233, b"1"))),
1334                Excluded(TableKey(concat(233, b"2")))
1335            )
1336        );
1337        assert_eq!(
1338            prefixed_range_with_vnode(
1339                (Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
1340                VirtualNode::from_index(233),
1341            ),
1342            (
1343                Included(TableKey(concat(233, b""))),
1344                Excluded(TableKey(concat(234, b"")))
1345            )
1346        );
1347        let max_vnode = VirtualNode::MAX_REPRESENTABLE.to_index();
1348        assert_eq!(
1349            prefixed_range_with_vnode(
1350                (Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
1351                VirtualNode::from_index(max_vnode),
1352            ),
1353            (Included(TableKey(concat(max_vnode, b""))), Unbounded)
1354        );
1355        let second_max_vnode = max_vnode - 1;
1356        assert_eq!(
1357            prefixed_range_with_vnode(
1358                (Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
1359                VirtualNode::from_index(second_max_vnode),
1360            ),
1361            (
1362                Included(TableKey(concat(second_max_vnode, b""))),
1363                Excluded(TableKey(concat(max_vnode, b"")))
1364            )
1365        );
1366    }
1367
1368    #[test]
1369    fn test_single_vnode_range() {
1370        let left_bound = vec![
1371            Included(b"0".as_slice()),
1372            Excluded(b"0".as_slice()),
1373            Unbounded,
1374        ];
1375        let right_bound = vec![
1376            Included(b"1".as_slice()),
1377            Excluded(b"1".as_slice()),
1378            Unbounded,
1379        ];
1380        for vnode in 0..VirtualNode::MAX_COUNT {
1381            for left in &left_bound {
1382                for right in &right_bound {
1383                    assert_eq!(
1384                        (vnode, vnode + 1),
1385                        vnode_range(&prefixed_range_with_vnode::<&[u8]>(
1386                            (*left, *right),
1387                            VirtualNode::from_index(vnode)
1388                        ))
1389                    )
1390                }
1391            }
1392        }
1393    }
1394}