risingwave_common/util/
row_id.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use super::epoch::UNIX_RISINGWAVE_DATE_EPOCH;
20use crate::bitmap::Bitmap;
21use crate::hash::VirtualNode;
22
23/// The number of bits occupied by the vnode part and the sequence part of a row id.
24const TIMESTAMP_SHIFT_BITS: u32 = 22;
25
26/// The number of bits occupied by the vnode part of a row id in the previous version.
27const COMPAT_VNODE_BITS: u32 = 10;
28
29/// Common timestamp management for row id generators.
30#[derive(Debug)]
31struct TimestampManager {
32    /// Specific base timestamp using for generating row ids.
33    base: SystemTime,
34    /// Last timestamp part of row id, based on `base`.
35    last_timestamp_ms: i64,
36}
37
38impl TimestampManager {
39    fn new() -> Self {
40        let base = *UNIX_RISINGWAVE_DATE_EPOCH;
41        Self {
42            base,
43            last_timestamp_ms: base.elapsed().unwrap().as_millis() as i64,
44        }
45    }
46
47    fn get_current_timestamp_ms(&self) -> i64 {
48        self.base.elapsed().unwrap().as_millis() as i64
49    }
50
51    fn try_update_timestamp(&mut self, should_update: bool) -> bool {
52        let current_timestamp_ms = self.get_current_timestamp_ms();
53        let to_update = match current_timestamp_ms.cmp(&self.last_timestamp_ms) {
54            Ordering::Less => {
55                tracing::warn!(
56                    "Clock moved backwards: last={}, current={}",
57                    self.last_timestamp_ms,
58                    current_timestamp_ms,
59                );
60                true
61            }
62            Ordering::Equal => should_update,
63            Ordering::Greater => true,
64        };
65
66        if to_update {
67            // If the timestamp is not increased, spin loop here and wait for next millisecond.
68            let mut current_timestamp_ms = current_timestamp_ms;
69            loop {
70                if current_timestamp_ms > self.last_timestamp_ms {
71                    break;
72                }
73                current_timestamp_ms = self.get_current_timestamp_ms();
74
75                #[cfg(madsim)]
76                tokio::time::advance(std::time::Duration::from_micros(10));
77                #[cfg(not(madsim))]
78                std::hint::spin_loop();
79            }
80            self.last_timestamp_ms = current_timestamp_ms;
81            true
82        } else {
83            false
84        }
85    }
86
87    fn last_timestamp_ms(&self) -> i64 {
88        self.last_timestamp_ms
89    }
90}
91
92/// `RowIdGenerator` generates unique row ids using snowflake algorithm as following format:
93///
94/// | timestamp | vnode & sequence |
95/// |-----------|------------------|
96/// |  41 bits  |     22 bits      |
97///
98/// The vnode part can occupy 10..=15 bits, which is determined by the vnode count. Thus,
99/// the sequence part will occupy 7..=12 bits. See [`bit_for_vnode`] for more details.
100#[derive(Debug)]
101pub struct RowIdGenerator {
102    /// Timestamp manager.
103    timestamp_mgr: TimestampManager,
104
105    /// The number of bits used for vnode.
106    vnode_bit: u32,
107
108    /// Virtual nodes used by this generator.
109    vnodes: Vec<VirtualNode>,
110
111    /// Current index of `vnodes`.
112    vnodes_index: u16,
113
114    /// Last sequence part of row id.
115    sequence: u16,
116}
117
118pub type RowId = i64;
119
120/// Extracts UNIX epoch milliseconds from a row id.
121pub fn row_id_to_unix_millis(row_id: RowId) -> Option<i64> {
122    let timestamp_ms = row_id >> TIMESTAMP_SHIFT_BITS;
123    let rw_epoch_unix_ms = UNIX_RISINGWAVE_DATE_EPOCH
124        .duration_since(UNIX_EPOCH)
125        .expect("risingwave epoch is after unix epoch")
126        .as_millis() as i64;
127    timestamp_ms.checked_add(rw_epoch_unix_ms)
128}
129
130/// The number of bits occupied by the vnode part of a row id.
131///
132/// In previous versions, this was fixed to 10 bits even if the vnode count was fixed to 256.
133/// For backward compatibility, we still use 10 bits for vnode count less than or equal to 1024.
134/// For larger vnode counts, we use the smallest power of 2 that fits the vnode count.
135fn bit_for_vnode(vnode_count: usize) -> u32 {
136    debug_assert!(
137        vnode_count <= VirtualNode::MAX_COUNT,
138        "invalid vnode count {vnode_count}"
139    );
140
141    if vnode_count <= 1 << COMPAT_VNODE_BITS {
142        COMPAT_VNODE_BITS
143    } else {
144        vnode_count.next_power_of_two().ilog2()
145    }
146}
147
148/// Compute vnode from the given row id.
149///
150/// # `vnode_count`
151///
152/// The given `vnode_count` determines the valid range of the returned vnode. It does not have to
153/// be the same as the vnode count used when the row id was generated with [`RowIdGenerator`].
154///
155/// However, only if they are the same, the vnode retrieved here is guaranteed to be the same as
156/// when it was generated. Otherwise, the vnode can be different and skewed, but the row ids
157/// generated under the same vnode will still yield the same result.
158///
159/// This is okay because we rely on the reversibility only if the serial type (row id) is generated
160/// and persisted in the same fragment, where the vnode count is the same. In other cases, the
161/// serial type is more like a normal integer type, and the algorithm to hash or compute vnode from
162/// it does not matter.
163#[inline]
164pub fn compute_vnode_from_row_id(id: RowId, vnode_count: usize) -> VirtualNode {
165    let vnode_bit = bit_for_vnode(vnode_count);
166    let sequence_bit = TIMESTAMP_SHIFT_BITS - vnode_bit;
167
168    let vnode_part = ((id >> sequence_bit) & ((1 << vnode_bit) - 1)) as usize;
169
170    // If the given `vnode_count` is the same as the one used when the row id was generated, this
171    // is no-op. Otherwise, we clamp the vnode to fit in the given vnode count.
172    VirtualNode::from_index(vnode_part % vnode_count)
173}
174
175impl RowIdGenerator {
176    /// Create a new `RowIdGenerator` with given virtual nodes and vnode count.
177    pub fn new(vnodes: impl IntoIterator<Item = VirtualNode>, vnode_count: usize) -> Self {
178        let vnode_bit = bit_for_vnode(vnode_count);
179
180        Self {
181            timestamp_mgr: TimestampManager::new(),
182            vnode_bit,
183            vnodes: vnodes.into_iter().collect(),
184            vnodes_index: 0,
185            sequence: 0,
186        }
187    }
188
189    /// The upper bound of the sequence part, exclusive.
190    fn sequence_upper_bound(&self) -> u16 {
191        1 << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)
192    }
193
194    /// Update the timestamp, so that the millisecond part of row id is **always** increased.
195    ///
196    /// This method will immediately return if the timestamp is increased or there's remaining
197    /// sequence for the current millisecond. Otherwise, it will spin loop until the timestamp is
198    /// increased.
199    fn try_update_timestamp(&mut self) {
200        let should_update = self.sequence == self.sequence_upper_bound();
201        if self.timestamp_mgr.try_update_timestamp(should_update) {
202            // Reset states. We do not reset the `vnode_index` to make all vnodes are evenly used.
203            self.sequence = 0;
204        }
205    }
206
207    /// Generate a new `RowId`. Returns `None` if the sequence reaches the upper bound of current
208    /// timestamp, and `try_update_timestamp` should be called to update the timestamp and reset the
209    /// sequence. After that, the next call of this method always returns `Some`.
210    fn next_row_id_in_current_timestamp(&mut self) -> Option<RowId> {
211        if self.sequence >= self.sequence_upper_bound() {
212            return None;
213        }
214
215        let vnode = self.vnodes[self.vnodes_index as usize].to_index();
216        let sequence = self.sequence;
217
218        self.vnodes_index = (self.vnodes_index + 1) % self.vnodes.len() as u16;
219        if self.vnodes_index == 0 {
220            self.sequence += 1;
221        }
222
223        Some(
224            self.timestamp_mgr.last_timestamp_ms() << TIMESTAMP_SHIFT_BITS
225                | (vnode << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)) as i64
226                | sequence as i64,
227        )
228    }
229
230    /// Returns an infinite iterator that generates `RowId`s.
231    fn gen_iter(&mut self) -> impl Iterator<Item = RowId> + '_ {
232        std::iter::from_fn(move || {
233            if let Some(next) = self.next_row_id_in_current_timestamp() {
234                Some(next)
235            } else {
236                self.try_update_timestamp();
237                Some(
238                    self.next_row_id_in_current_timestamp()
239                        .expect("timestamp should be updated"),
240                )
241            }
242        })
243    }
244
245    /// Generate a sequence of `RowId`s. Compared to `next`, this method is more efficient as it
246    /// only checks the timestamp once before generating the first `RowId`, instead of doing that
247    /// every `RowId`.
248    ///
249    /// This may block for a while if too many IDs are generated in one millisecond.
250    pub fn next_batch(&mut self, length: usize) -> Vec<RowId> {
251        self.try_update_timestamp();
252
253        let mut ret = Vec::with_capacity(length);
254        ret.extend(self.gen_iter().take(length));
255        assert_eq!(ret.len(), length);
256        ret
257    }
258
259    /// Generate a new `RowId`.
260    ///
261    /// This may block for a while if too many IDs are generated in one millisecond.
262    #[allow(clippy::should_implement_trait)]
263    pub fn next(&mut self) -> RowId {
264        self.try_update_timestamp();
265
266        self.gen_iter().next().unwrap()
267    }
268}
269
270/// `ChangelogRowIdGenerator` generates unique changelog row ids using snowflake algorithm.
271/// Unlike `RowIdGenerator`, it maintains a separate sequence for each vnode and generates
272/// row ids based on the input vnode.
273#[derive(Debug)]
274pub struct ChangelogRowIdGenerator {
275    /// Timestamp manager.
276    timestamp_mgr: TimestampManager,
277
278    /// The number of bits used for vnode.
279    vnode_bit: u32,
280
281    /// Sequence for each vnode. Key is vnode index, value is sequence.
282    vnodes_sequence: HashMap<VirtualNode, u16>,
283
284    vnodes: Bitmap,
285}
286
287impl ChangelogRowIdGenerator {
288    /// Create a new `ChangelogRowIdGenerator` with given vnode count.
289    pub fn new(vnodes: Bitmap, vnode_count: usize) -> Self {
290        let vnode_bit = bit_for_vnode(vnode_count);
291        let mut generator = Self {
292            timestamp_mgr: TimestampManager::new(),
293            vnode_bit,
294            vnodes_sequence: HashMap::default(),
295            vnodes,
296        };
297        generator.try_update_timestamp();
298        generator
299    }
300
301    /// The upper bound of the sequence part for changelog, exclusive.
302    fn sequence_upper_bound(&self) -> u16 {
303        1 << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)
304    }
305
306    fn try_update_timestamp(&mut self) {
307        if self.timestamp_mgr.try_update_timestamp(true) {
308            // Reset states: reset all vnode sequences to 0.
309            self.vnodes_sequence.clear();
310        }
311    }
312
313    fn next_changelog_row_id_in_current_timestamp(&mut self, vnode: &VirtualNode) -> Option<RowId> {
314        if !self.vnodes.is_set(vnode.to_index()) && *vnode != VirtualNode::ZERO {
315            panic!("vnode {:?} not in generator", vnode);
316        }
317        let current_sequence = *self.vnodes_sequence.get(vnode).unwrap_or(&1);
318
319        if current_sequence >= self.sequence_upper_bound() {
320            return None;
321        }
322
323        let sequence = current_sequence;
324        self.vnodes_sequence.insert(*vnode, current_sequence + 1);
325
326        Some(
327            self.timestamp_mgr.last_timestamp_ms() << TIMESTAMP_SHIFT_BITS
328                | (vnode.to_index() << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)) as i64
329                | sequence as i64,
330        )
331    }
332
333    pub fn next(&mut self, vnode: &VirtualNode) -> RowId {
334        if let Some(row_id) = self.next_changelog_row_id_in_current_timestamp(vnode) {
335            row_id
336        } else {
337            self.try_update_timestamp();
338            self.next_changelog_row_id_in_current_timestamp(vnode)
339                .expect("timestamp should be updated")
340        }
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use std::time::Duration;
347
348    use itertools::Itertools;
349
350    use super::*;
351
352    #[allow(clippy::unused_async)] // `madsim::time::advance` requires to be in async context
353    async fn test_generator_with_vnode_count(vnode_count: usize) {
354        let mut generator = RowIdGenerator::new([VirtualNode::from_index(0)], vnode_count);
355        let sequence_upper_bound = generator.sequence_upper_bound();
356
357        let mut last_row_id = generator.next();
358        for _ in 0..100000 {
359            let row_id = generator.next();
360            assert!(row_id > last_row_id);
361            last_row_id = row_id;
362        }
363
364        let dur = Duration::from_millis(10);
365        #[cfg(madsim)]
366        tokio::time::advance(dur);
367        #[cfg(not(madsim))]
368        std::thread::sleep(dur);
369
370        let row_id = generator.next();
371        assert!(row_id > last_row_id);
372        assert_ne!(
373            row_id >> TIMESTAMP_SHIFT_BITS,
374            last_row_id >> TIMESTAMP_SHIFT_BITS
375        );
376        assert_eq!(row_id & (sequence_upper_bound as i64 - 1), 0);
377
378        let mut generator = RowIdGenerator::new([VirtualNode::from_index(1)], vnode_count);
379        let row_ids = generator.next_batch((sequence_upper_bound + 10) as usize);
380        let mut expected = (0..sequence_upper_bound).collect_vec();
381        expected.extend(0..10);
382        assert_eq!(
383            row_ids
384                .into_iter()
385                .map(|id| (id as u16) & (sequence_upper_bound - 1))
386                .collect_vec(),
387            expected
388        );
389    }
390
391    #[allow(clippy::unused_async)] // `madsim::time::advance` requires to be in async context
392    async fn test_generator_multiple_vnodes_with_vnode_count(vnode_count: usize) {
393        assert!(vnode_count >= 20);
394
395        let vnodes = || {
396            (0..10)
397                .chain((vnode_count - 10)..vnode_count)
398                .map(VirtualNode::from_index)
399        };
400        let vnode_of = |row_id: RowId| compute_vnode_from_row_id(row_id, vnode_count);
401
402        let mut generator = RowIdGenerator::new(vnodes(), vnode_count);
403        let sequence_upper_bound = generator.sequence_upper_bound();
404
405        let row_ids = generator.next_batch((sequence_upper_bound as usize) * 20 + 1);
406
407        // Check timestamps.
408        let timestamps = row_ids
409            .iter()
410            .map(|&r| r >> TIMESTAMP_SHIFT_BITS)
411            .collect_vec();
412
413        let (last_timestamp, first_timestamps) = timestamps.split_last().unwrap();
414        let first_timestamp = first_timestamps.iter().unique().exactly_one().unwrap();
415
416        // Check vnodes.
417        let expected_vnodes = vnodes().cycle();
418        let actual_vnodes = row_ids.iter().map(|&r| vnode_of(r));
419
420        #[expect(clippy::disallowed_methods)] // `expected_vnodes` is an endless cycle iterator
421        for (expected, actual) in expected_vnodes.zip(actual_vnodes) {
422            assert_eq!(expected, actual);
423        }
424
425        assert!(last_timestamp > first_timestamp);
426    }
427
428    macro_rules! test {
429        ($vnode_count:expr, $name:ident, $name_mul:ident) => {
430            #[tokio::test]
431            async fn $name() {
432                test_generator_with_vnode_count($vnode_count).await;
433            }
434
435            #[tokio::test]
436            async fn $name_mul() {
437                test_generator_multiple_vnodes_with_vnode_count($vnode_count).await;
438            }
439        };
440    }
441
442    test!(64, test_64, test_64_mul); // less than default value
443    test!(114, test_114, test_114_mul); // not a power of 2, less than default value
444    test!(256, test_256, test_256_mul); // default value, backward compatibility
445    test!(1 << COMPAT_VNODE_BITS, test_1024, test_1024_mul); // max value with 10 bits
446    test!(2048, test_2048, test_2048_mul); // more than 10 bits
447    test!(2333, test_2333, test_2333_mul); // not a power of 2, larger than default value
448    test!(VirtualNode::MAX_COUNT, test_max, test_max_mul); // max supported
449}