risingwave_common/util/
row_id.rs1use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use super::epoch::UNIX_RISINGWAVE_DATE_EPOCH;
20use crate::bitmap::Bitmap;
21use crate::hash::VirtualNode;
22
23const TIMESTAMP_SHIFT_BITS: u32 = 22;
25
26const COMPAT_VNODE_BITS: u32 = 10;
28
29#[derive(Debug)]
31struct TimestampManager {
32 base: SystemTime,
34 last_timestamp_ms: i64,
36}
37
38impl TimestampManager {
39 fn new() -> Self {
40 let base = *UNIX_RISINGWAVE_DATE_EPOCH;
41 Self {
42 base,
43 last_timestamp_ms: base.elapsed().unwrap().as_millis() as i64,
44 }
45 }
46
47 fn get_current_timestamp_ms(&self) -> i64 {
48 self.base.elapsed().unwrap().as_millis() as i64
49 }
50
51 fn try_update_timestamp(&mut self, should_update: bool) -> bool {
52 let current_timestamp_ms = self.get_current_timestamp_ms();
53 let to_update = match current_timestamp_ms.cmp(&self.last_timestamp_ms) {
54 Ordering::Less => {
55 tracing::warn!(
56 "Clock moved backwards: last={}, current={}",
57 self.last_timestamp_ms,
58 current_timestamp_ms,
59 );
60 true
61 }
62 Ordering::Equal => should_update,
63 Ordering::Greater => true,
64 };
65
66 if to_update {
67 let mut current_timestamp_ms = current_timestamp_ms;
69 loop {
70 if current_timestamp_ms > self.last_timestamp_ms {
71 break;
72 }
73 current_timestamp_ms = self.get_current_timestamp_ms();
74
75 #[cfg(madsim)]
76 tokio::time::advance(std::time::Duration::from_micros(10));
77 #[cfg(not(madsim))]
78 std::hint::spin_loop();
79 }
80 self.last_timestamp_ms = current_timestamp_ms;
81 true
82 } else {
83 false
84 }
85 }
86
87 fn last_timestamp_ms(&self) -> i64 {
88 self.last_timestamp_ms
89 }
90}
91
92#[derive(Debug)]
101pub struct RowIdGenerator {
102 timestamp_mgr: TimestampManager,
104
105 vnode_bit: u32,
107
108 vnodes: Vec<VirtualNode>,
110
111 vnodes_index: u16,
113
114 sequence: u16,
116}
117
118pub type RowId = i64;
119
120pub fn row_id_to_unix_millis(row_id: RowId) -> Option<i64> {
122 let timestamp_ms = row_id >> TIMESTAMP_SHIFT_BITS;
123 let rw_epoch_unix_ms = UNIX_RISINGWAVE_DATE_EPOCH
124 .duration_since(UNIX_EPOCH)
125 .expect("risingwave epoch is after unix epoch")
126 .as_millis() as i64;
127 timestamp_ms.checked_add(rw_epoch_unix_ms)
128}
129
130fn bit_for_vnode(vnode_count: usize) -> u32 {
136 debug_assert!(
137 vnode_count <= VirtualNode::MAX_COUNT,
138 "invalid vnode count {vnode_count}"
139 );
140
141 if vnode_count <= 1 << COMPAT_VNODE_BITS {
142 COMPAT_VNODE_BITS
143 } else {
144 vnode_count.next_power_of_two().ilog2()
145 }
146}
147
148#[inline]
164pub fn compute_vnode_from_row_id(id: RowId, vnode_count: usize) -> VirtualNode {
165 let vnode_bit = bit_for_vnode(vnode_count);
166 let sequence_bit = TIMESTAMP_SHIFT_BITS - vnode_bit;
167
168 let vnode_part = ((id >> sequence_bit) & ((1 << vnode_bit) - 1)) as usize;
169
170 VirtualNode::from_index(vnode_part % vnode_count)
173}
174
175impl RowIdGenerator {
176 pub fn new(vnodes: impl IntoIterator<Item = VirtualNode>, vnode_count: usize) -> Self {
178 let vnode_bit = bit_for_vnode(vnode_count);
179
180 Self {
181 timestamp_mgr: TimestampManager::new(),
182 vnode_bit,
183 vnodes: vnodes.into_iter().collect(),
184 vnodes_index: 0,
185 sequence: 0,
186 }
187 }
188
189 fn sequence_upper_bound(&self) -> u16 {
191 1 << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)
192 }
193
194 fn try_update_timestamp(&mut self) {
200 let should_update = self.sequence == self.sequence_upper_bound();
201 if self.timestamp_mgr.try_update_timestamp(should_update) {
202 self.sequence = 0;
204 }
205 }
206
207 fn next_row_id_in_current_timestamp(&mut self) -> Option<RowId> {
211 if self.sequence >= self.sequence_upper_bound() {
212 return None;
213 }
214
215 let vnode = self.vnodes[self.vnodes_index as usize].to_index();
216 let sequence = self.sequence;
217
218 self.vnodes_index = (self.vnodes_index + 1) % self.vnodes.len() as u16;
219 if self.vnodes_index == 0 {
220 self.sequence += 1;
221 }
222
223 Some(
224 self.timestamp_mgr.last_timestamp_ms() << TIMESTAMP_SHIFT_BITS
225 | (vnode << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)) as i64
226 | sequence as i64,
227 )
228 }
229
230 fn gen_iter(&mut self) -> impl Iterator<Item = RowId> + '_ {
232 std::iter::from_fn(move || {
233 if let Some(next) = self.next_row_id_in_current_timestamp() {
234 Some(next)
235 } else {
236 self.try_update_timestamp();
237 Some(
238 self.next_row_id_in_current_timestamp()
239 .expect("timestamp should be updated"),
240 )
241 }
242 })
243 }
244
245 pub fn next_batch(&mut self, length: usize) -> Vec<RowId> {
251 self.try_update_timestamp();
252
253 let mut ret = Vec::with_capacity(length);
254 ret.extend(self.gen_iter().take(length));
255 assert_eq!(ret.len(), length);
256 ret
257 }
258
259 #[allow(clippy::should_implement_trait)]
263 pub fn next(&mut self) -> RowId {
264 self.try_update_timestamp();
265
266 self.gen_iter().next().unwrap()
267 }
268}
269
270#[derive(Debug)]
274pub struct ChangelogRowIdGenerator {
275 timestamp_mgr: TimestampManager,
277
278 vnode_bit: u32,
280
281 vnodes_sequence: HashMap<VirtualNode, u16>,
283
284 vnodes: Bitmap,
285}
286
287impl ChangelogRowIdGenerator {
288 pub fn new(vnodes: Bitmap, vnode_count: usize) -> Self {
290 let vnode_bit = bit_for_vnode(vnode_count);
291 let mut generator = Self {
292 timestamp_mgr: TimestampManager::new(),
293 vnode_bit,
294 vnodes_sequence: HashMap::default(),
295 vnodes,
296 };
297 generator.try_update_timestamp();
298 generator
299 }
300
301 fn sequence_upper_bound(&self) -> u16 {
303 1 << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)
304 }
305
306 fn try_update_timestamp(&mut self) {
307 if self.timestamp_mgr.try_update_timestamp(true) {
308 self.vnodes_sequence.clear();
310 }
311 }
312
313 fn next_changelog_row_id_in_current_timestamp(&mut self, vnode: &VirtualNode) -> Option<RowId> {
314 if !self.vnodes.is_set(vnode.to_index()) && *vnode != VirtualNode::ZERO {
315 panic!("vnode {:?} not in generator", vnode);
316 }
317 let current_sequence = *self.vnodes_sequence.get(vnode).unwrap_or(&1);
318
319 if current_sequence >= self.sequence_upper_bound() {
320 return None;
321 }
322
323 let sequence = current_sequence;
324 self.vnodes_sequence.insert(*vnode, current_sequence + 1);
325
326 Some(
327 self.timestamp_mgr.last_timestamp_ms() << TIMESTAMP_SHIFT_BITS
328 | (vnode.to_index() << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)) as i64
329 | sequence as i64,
330 )
331 }
332
333 pub fn next(&mut self, vnode: &VirtualNode) -> RowId {
334 if let Some(row_id) = self.next_changelog_row_id_in_current_timestamp(vnode) {
335 row_id
336 } else {
337 self.try_update_timestamp();
338 self.next_changelog_row_id_in_current_timestamp(vnode)
339 .expect("timestamp should be updated")
340 }
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use std::time::Duration;
347
348 use itertools::Itertools;
349
350 use super::*;
351
352 #[allow(clippy::unused_async)] async fn test_generator_with_vnode_count(vnode_count: usize) {
354 let mut generator = RowIdGenerator::new([VirtualNode::from_index(0)], vnode_count);
355 let sequence_upper_bound = generator.sequence_upper_bound();
356
357 let mut last_row_id = generator.next();
358 for _ in 0..100000 {
359 let row_id = generator.next();
360 assert!(row_id > last_row_id);
361 last_row_id = row_id;
362 }
363
364 let dur = Duration::from_millis(10);
365 #[cfg(madsim)]
366 tokio::time::advance(dur);
367 #[cfg(not(madsim))]
368 std::thread::sleep(dur);
369
370 let row_id = generator.next();
371 assert!(row_id > last_row_id);
372 assert_ne!(
373 row_id >> TIMESTAMP_SHIFT_BITS,
374 last_row_id >> TIMESTAMP_SHIFT_BITS
375 );
376 assert_eq!(row_id & (sequence_upper_bound as i64 - 1), 0);
377
378 let mut generator = RowIdGenerator::new([VirtualNode::from_index(1)], vnode_count);
379 let row_ids = generator.next_batch((sequence_upper_bound + 10) as usize);
380 let mut expected = (0..sequence_upper_bound).collect_vec();
381 expected.extend(0..10);
382 assert_eq!(
383 row_ids
384 .into_iter()
385 .map(|id| (id as u16) & (sequence_upper_bound - 1))
386 .collect_vec(),
387 expected
388 );
389 }
390
391 #[allow(clippy::unused_async)] async fn test_generator_multiple_vnodes_with_vnode_count(vnode_count: usize) {
393 assert!(vnode_count >= 20);
394
395 let vnodes = || {
396 (0..10)
397 .chain((vnode_count - 10)..vnode_count)
398 .map(VirtualNode::from_index)
399 };
400 let vnode_of = |row_id: RowId| compute_vnode_from_row_id(row_id, vnode_count);
401
402 let mut generator = RowIdGenerator::new(vnodes(), vnode_count);
403 let sequence_upper_bound = generator.sequence_upper_bound();
404
405 let row_ids = generator.next_batch((sequence_upper_bound as usize) * 20 + 1);
406
407 let timestamps = row_ids
409 .iter()
410 .map(|&r| r >> TIMESTAMP_SHIFT_BITS)
411 .collect_vec();
412
413 let (last_timestamp, first_timestamps) = timestamps.split_last().unwrap();
414 let first_timestamp = first_timestamps.iter().unique().exactly_one().unwrap();
415
416 let expected_vnodes = vnodes().cycle();
418 let actual_vnodes = row_ids.iter().map(|&r| vnode_of(r));
419
420 #[expect(clippy::disallowed_methods)] for (expected, actual) in expected_vnodes.zip(actual_vnodes) {
422 assert_eq!(expected, actual);
423 }
424
425 assert!(last_timestamp > first_timestamp);
426 }
427
428 macro_rules! test {
429 ($vnode_count:expr, $name:ident, $name_mul:ident) => {
430 #[tokio::test]
431 async fn $name() {
432 test_generator_with_vnode_count($vnode_count).await;
433 }
434
435 #[tokio::test]
436 async fn $name_mul() {
437 test_generator_multiple_vnodes_with_vnode_count($vnode_count).await;
438 }
439 };
440 }
441
442 test!(64, test_64, test_64_mul); test!(114, test_114, test_114_mul); test!(256, test_256, test_256_mul); test!(1 << COMPAT_VNODE_BITS, test_1024, test_1024_mul); test!(2048, test_2048, test_2048_mul); test!(2333, test_2333, test_2333_mul); test!(VirtualNode::MAX_COUNT, test_max, test_max_mul); }