risingwave_common/util/
row_id.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::time::SystemTime;
18
19use super::epoch::UNIX_RISINGWAVE_DATE_EPOCH;
20use crate::bitmap::Bitmap;
21use crate::hash::VirtualNode;
22
23/// The number of bits occupied by the vnode part and the sequence part of a row id.
24const TIMESTAMP_SHIFT_BITS: u32 = 22;
25
26/// The number of bits occupied by the vnode part of a row id in the previous version.
27const COMPAT_VNODE_BITS: u32 = 10;
28
29/// Common timestamp management for row id generators.
30#[derive(Debug)]
31struct TimestampManager {
32    /// Specific base timestamp using for generating row ids.
33    base: SystemTime,
34    /// Last timestamp part of row id, based on `base`.
35    last_timestamp_ms: i64,
36}
37
38impl TimestampManager {
39    fn new() -> Self {
40        let base = *UNIX_RISINGWAVE_DATE_EPOCH;
41        Self {
42            base,
43            last_timestamp_ms: base.elapsed().unwrap().as_millis() as i64,
44        }
45    }
46
47    fn get_current_timestamp_ms(&self) -> i64 {
48        self.base.elapsed().unwrap().as_millis() as i64
49    }
50
51    fn try_update_timestamp(&mut self, should_update: bool) -> bool {
52        let current_timestamp_ms = self.get_current_timestamp_ms();
53        let to_update = match current_timestamp_ms.cmp(&self.last_timestamp_ms) {
54            Ordering::Less => {
55                tracing::warn!(
56                    "Clock moved backwards: last={}, current={}",
57                    self.last_timestamp_ms,
58                    current_timestamp_ms,
59                );
60                true
61            }
62            Ordering::Equal => should_update,
63            Ordering::Greater => true,
64        };
65
66        if to_update {
67            // If the timestamp is not increased, spin loop here and wait for next millisecond.
68            let mut current_timestamp_ms = current_timestamp_ms;
69            loop {
70                if current_timestamp_ms > self.last_timestamp_ms {
71                    break;
72                }
73                current_timestamp_ms = self.get_current_timestamp_ms();
74
75                #[cfg(madsim)]
76                tokio::time::advance(std::time::Duration::from_micros(10));
77                #[cfg(not(madsim))]
78                std::hint::spin_loop();
79            }
80            self.last_timestamp_ms = current_timestamp_ms;
81            true
82        } else {
83            false
84        }
85    }
86
87    fn last_timestamp_ms(&self) -> i64 {
88        self.last_timestamp_ms
89    }
90}
91
92/// `RowIdGenerator` generates unique row ids using snowflake algorithm as following format:
93///
94/// | timestamp | vnode & sequence |
95/// |-----------|------------------|
96/// |  41 bits  |     22 bits      |
97///
98/// The vnode part can occupy 10..=15 bits, which is determined by the vnode count. Thus,
99/// the sequence part will occupy 7..=12 bits. See [`bit_for_vnode`] for more details.
100#[derive(Debug)]
101pub struct RowIdGenerator {
102    /// Timestamp manager.
103    timestamp_mgr: TimestampManager,
104
105    /// The number of bits used for vnode.
106    vnode_bit: u32,
107
108    /// Virtual nodes used by this generator.
109    vnodes: Vec<VirtualNode>,
110
111    /// Current index of `vnodes`.
112    vnodes_index: u16,
113
114    /// Last sequence part of row id.
115    sequence: u16,
116}
117
118pub type RowId = i64;
119
120/// The number of bits occupied by the vnode part of a row id.
121///
122/// In previous versions, this was fixed to 10 bits even if the vnode count was fixed to 256.
123/// For backward compatibility, we still use 10 bits for vnode count less than or equal to 1024.
124/// For larger vnode counts, we use the smallest power of 2 that fits the vnode count.
125fn bit_for_vnode(vnode_count: usize) -> u32 {
126    debug_assert!(
127        vnode_count <= VirtualNode::MAX_COUNT,
128        "invalid vnode count {vnode_count}"
129    );
130
131    if vnode_count <= 1 << COMPAT_VNODE_BITS {
132        COMPAT_VNODE_BITS
133    } else {
134        vnode_count.next_power_of_two().ilog2()
135    }
136}
137
138/// Compute vnode from the given row id.
139///
140/// # `vnode_count`
141///
142/// The given `vnode_count` determines the valid range of the returned vnode. It does not have to
143/// be the same as the vnode count used when the row id was generated with [`RowIdGenerator`].
144///
145/// However, only if they are the same, the vnode retrieved here is guaranteed to be the same as
146/// when it was generated. Otherwise, the vnode can be different and skewed, but the row ids
147/// generated under the same vnode will still yield the same result.
148///
149/// This is okay because we rely on the reversibility only if the serial type (row id) is generated
150/// and persisted in the same fragment, where the vnode count is the same. In other cases, the
151/// serial type is more like a normal integer type, and the algorithm to hash or compute vnode from
152/// it does not matter.
153#[inline]
154pub fn compute_vnode_from_row_id(id: RowId, vnode_count: usize) -> VirtualNode {
155    let vnode_bit = bit_for_vnode(vnode_count);
156    let sequence_bit = TIMESTAMP_SHIFT_BITS - vnode_bit;
157
158    let vnode_part = ((id >> sequence_bit) & ((1 << vnode_bit) - 1)) as usize;
159
160    // If the given `vnode_count` is the same as the one used when the row id was generated, this
161    // is no-op. Otherwise, we clamp the vnode to fit in the given vnode count.
162    VirtualNode::from_index(vnode_part % vnode_count)
163}
164
165impl RowIdGenerator {
166    /// Create a new `RowIdGenerator` with given virtual nodes and vnode count.
167    pub fn new(vnodes: impl IntoIterator<Item = VirtualNode>, vnode_count: usize) -> Self {
168        let vnode_bit = bit_for_vnode(vnode_count);
169
170        Self {
171            timestamp_mgr: TimestampManager::new(),
172            vnode_bit,
173            vnodes: vnodes.into_iter().collect(),
174            vnodes_index: 0,
175            sequence: 0,
176        }
177    }
178
179    /// The upper bound of the sequence part, exclusive.
180    fn sequence_upper_bound(&self) -> u16 {
181        1 << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)
182    }
183
184    /// Update the timestamp, so that the millisecond part of row id is **always** increased.
185    ///
186    /// This method will immediately return if the timestamp is increased or there's remaining
187    /// sequence for the current millisecond. Otherwise, it will spin loop until the timestamp is
188    /// increased.
189    fn try_update_timestamp(&mut self) {
190        let should_update = self.sequence == self.sequence_upper_bound();
191        if self.timestamp_mgr.try_update_timestamp(should_update) {
192            // Reset states. We do not reset the `vnode_index` to make all vnodes are evenly used.
193            self.sequence = 0;
194        }
195    }
196
197    /// Generate a new `RowId`. Returns `None` if the sequence reaches the upper bound of current
198    /// timestamp, and `try_update_timestamp` should be called to update the timestamp and reset the
199    /// sequence. After that, the next call of this method always returns `Some`.
200    fn next_row_id_in_current_timestamp(&mut self) -> Option<RowId> {
201        if self.sequence >= self.sequence_upper_bound() {
202            return None;
203        }
204
205        let vnode = self.vnodes[self.vnodes_index as usize].to_index();
206        let sequence = self.sequence;
207
208        self.vnodes_index = (self.vnodes_index + 1) % self.vnodes.len() as u16;
209        if self.vnodes_index == 0 {
210            self.sequence += 1;
211        }
212
213        Some(
214            self.timestamp_mgr.last_timestamp_ms() << TIMESTAMP_SHIFT_BITS
215                | (vnode << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)) as i64
216                | sequence as i64,
217        )
218    }
219
220    /// Returns an infinite iterator that generates `RowId`s.
221    fn gen_iter(&mut self) -> impl Iterator<Item = RowId> + '_ {
222        std::iter::from_fn(move || {
223            if let Some(next) = self.next_row_id_in_current_timestamp() {
224                Some(next)
225            } else {
226                self.try_update_timestamp();
227                Some(
228                    self.next_row_id_in_current_timestamp()
229                        .expect("timestamp should be updated"),
230                )
231            }
232        })
233    }
234
235    /// Generate a sequence of `RowId`s. Compared to `next`, this method is more efficient as it
236    /// only checks the timestamp once before generating the first `RowId`, instead of doing that
237    /// every `RowId`.
238    ///
239    /// This may block for a while if too many IDs are generated in one millisecond.
240    pub fn next_batch(&mut self, length: usize) -> Vec<RowId> {
241        self.try_update_timestamp();
242
243        let mut ret = Vec::with_capacity(length);
244        ret.extend(self.gen_iter().take(length));
245        assert_eq!(ret.len(), length);
246        ret
247    }
248
249    /// Generate a new `RowId`.
250    ///
251    /// This may block for a while if too many IDs are generated in one millisecond.
252    #[allow(clippy::should_implement_trait)]
253    pub fn next(&mut self) -> RowId {
254        self.try_update_timestamp();
255
256        self.gen_iter().next().unwrap()
257    }
258}
259
260/// `ChangelogRowIdGenerator` generates unique changelog row ids using snowflake algorithm.
261/// Unlike `RowIdGenerator`, it maintains a separate sequence for each vnode and generates
262/// row ids based on the input vnode.
263#[derive(Debug)]
264pub struct ChangelogRowIdGenerator {
265    /// Timestamp manager.
266    timestamp_mgr: TimestampManager,
267
268    /// The number of bits used for vnode.
269    vnode_bit: u32,
270
271    /// Sequence for each vnode. Key is vnode index, value is sequence.
272    vnodes_sequence: HashMap<VirtualNode, u16>,
273
274    vnodes: Bitmap,
275}
276
277impl ChangelogRowIdGenerator {
278    /// Create a new `ChangelogRowIdGenerator` with given vnode count.
279    pub fn new(vnodes: Bitmap, vnode_count: usize) -> Self {
280        let vnode_bit = bit_for_vnode(vnode_count);
281        let mut generator = Self {
282            timestamp_mgr: TimestampManager::new(),
283            vnode_bit,
284            vnodes_sequence: HashMap::default(),
285            vnodes,
286        };
287        generator.try_update_timestamp();
288        generator
289    }
290
291    /// The upper bound of the sequence part for changelog, exclusive.
292    fn sequence_upper_bound(&self) -> u16 {
293        1 << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)
294    }
295
296    fn try_update_timestamp(&mut self) {
297        if self.timestamp_mgr.try_update_timestamp(true) {
298            // Reset states: reset all vnode sequences to 0.
299            self.vnodes_sequence.clear();
300        }
301    }
302
303    fn next_changelog_row_id_in_current_timestamp(&mut self, vnode: &VirtualNode) -> Option<RowId> {
304        if !self.vnodes.is_set(vnode.to_index()) && *vnode != VirtualNode::ZERO {
305            panic!("vnode {:?} not in generator", vnode);
306        }
307        let current_sequence = *self.vnodes_sequence.get(vnode).unwrap_or(&1);
308
309        if current_sequence >= self.sequence_upper_bound() {
310            return None;
311        }
312
313        let sequence = current_sequence;
314        self.vnodes_sequence.insert(*vnode, current_sequence + 1);
315
316        Some(
317            self.timestamp_mgr.last_timestamp_ms() << TIMESTAMP_SHIFT_BITS
318                | (vnode.to_index() << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)) as i64
319                | sequence as i64,
320        )
321    }
322
323    pub fn next(&mut self, vnode: &VirtualNode) -> RowId {
324        if let Some(row_id) = self.next_changelog_row_id_in_current_timestamp(vnode) {
325            row_id
326        } else {
327            self.try_update_timestamp();
328            self.next_changelog_row_id_in_current_timestamp(vnode)
329                .expect("timestamp should be updated")
330        }
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use std::time::Duration;
337
338    use itertools::Itertools;
339
340    use super::*;
341
342    #[allow(clippy::unused_async)] // `madsim::time::advance` requires to be in async context
343    async fn test_generator_with_vnode_count(vnode_count: usize) {
344        let mut generator = RowIdGenerator::new([VirtualNode::from_index(0)], vnode_count);
345        let sequence_upper_bound = generator.sequence_upper_bound();
346
347        let mut last_row_id = generator.next();
348        for _ in 0..100000 {
349            let row_id = generator.next();
350            assert!(row_id > last_row_id);
351            last_row_id = row_id;
352        }
353
354        let dur = Duration::from_millis(10);
355        #[cfg(madsim)]
356        tokio::time::advance(dur);
357        #[cfg(not(madsim))]
358        std::thread::sleep(dur);
359
360        let row_id = generator.next();
361        assert!(row_id > last_row_id);
362        assert_ne!(
363            row_id >> TIMESTAMP_SHIFT_BITS,
364            last_row_id >> TIMESTAMP_SHIFT_BITS
365        );
366        assert_eq!(row_id & (sequence_upper_bound as i64 - 1), 0);
367
368        let mut generator = RowIdGenerator::new([VirtualNode::from_index(1)], vnode_count);
369        let row_ids = generator.next_batch((sequence_upper_bound + 10) as usize);
370        let mut expected = (0..sequence_upper_bound).collect_vec();
371        expected.extend(0..10);
372        assert_eq!(
373            row_ids
374                .into_iter()
375                .map(|id| (id as u16) & (sequence_upper_bound - 1))
376                .collect_vec(),
377            expected
378        );
379    }
380
381    #[allow(clippy::unused_async)] // `madsim::time::advance` requires to be in async context
382    async fn test_generator_multiple_vnodes_with_vnode_count(vnode_count: usize) {
383        assert!(vnode_count >= 20);
384
385        let vnodes = || {
386            (0..10)
387                .chain((vnode_count - 10)..vnode_count)
388                .map(VirtualNode::from_index)
389        };
390        let vnode_of = |row_id: RowId| compute_vnode_from_row_id(row_id, vnode_count);
391
392        let mut generator = RowIdGenerator::new(vnodes(), vnode_count);
393        let sequence_upper_bound = generator.sequence_upper_bound();
394
395        let row_ids = generator.next_batch((sequence_upper_bound as usize) * 20 + 1);
396
397        // Check timestamps.
398        let timestamps = row_ids
399            .iter()
400            .map(|&r| r >> TIMESTAMP_SHIFT_BITS)
401            .collect_vec();
402
403        let (last_timestamp, first_timestamps) = timestamps.split_last().unwrap();
404        let first_timestamp = first_timestamps.iter().unique().exactly_one().unwrap();
405
406        // Check vnodes.
407        let expected_vnodes = vnodes().cycle();
408        let actual_vnodes = row_ids.iter().map(|&r| vnode_of(r));
409
410        #[expect(clippy::disallowed_methods)] // `expected_vnodes` is an endless cycle iterator
411        for (expected, actual) in expected_vnodes.zip(actual_vnodes) {
412            assert_eq!(expected, actual);
413        }
414
415        assert!(last_timestamp > first_timestamp);
416    }
417
418    macro_rules! test {
419        ($vnode_count:expr, $name:ident, $name_mul:ident) => {
420            #[tokio::test]
421            async fn $name() {
422                test_generator_with_vnode_count($vnode_count).await;
423            }
424
425            #[tokio::test]
426            async fn $name_mul() {
427                test_generator_multiple_vnodes_with_vnode_count($vnode_count).await;
428            }
429        };
430    }
431
432    test!(64, test_64, test_64_mul); // less than default value
433    test!(114, test_114, test_114_mul); // not a power of 2, less than default value
434    test!(256, test_256, test_256_mul); // default value, backward compatibility
435    test!(1 << COMPAT_VNODE_BITS, test_1024, test_1024_mul); // max value with 10 bits
436    test!(2048, test_2048, test_2048_mul); // more than 10 bits
437    test!(2333, test_2333, test_2333_mul); // not a power of 2, larger than default value
438    test!(VirtualNode::MAX_COUNT, test_max, test_max_mul); // max supported
439}