risingwave_common/util/
row_id.rs1use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::time::SystemTime;
18
19use super::epoch::UNIX_RISINGWAVE_DATE_EPOCH;
20use crate::bitmap::Bitmap;
21use crate::hash::VirtualNode;
22
23const TIMESTAMP_SHIFT_BITS: u32 = 22;
25
26const COMPAT_VNODE_BITS: u32 = 10;
28
29#[derive(Debug)]
31struct TimestampManager {
32 base: SystemTime,
34 last_timestamp_ms: i64,
36}
37
38impl TimestampManager {
39 fn new() -> Self {
40 let base = *UNIX_RISINGWAVE_DATE_EPOCH;
41 Self {
42 base,
43 last_timestamp_ms: base.elapsed().unwrap().as_millis() as i64,
44 }
45 }
46
47 fn get_current_timestamp_ms(&self) -> i64 {
48 self.base.elapsed().unwrap().as_millis() as i64
49 }
50
51 fn try_update_timestamp(&mut self, should_update: bool) -> bool {
52 let current_timestamp_ms = self.get_current_timestamp_ms();
53 let to_update = match current_timestamp_ms.cmp(&self.last_timestamp_ms) {
54 Ordering::Less => {
55 tracing::warn!(
56 "Clock moved backwards: last={}, current={}",
57 self.last_timestamp_ms,
58 current_timestamp_ms,
59 );
60 true
61 }
62 Ordering::Equal => should_update,
63 Ordering::Greater => true,
64 };
65
66 if to_update {
67 let mut current_timestamp_ms = current_timestamp_ms;
69 loop {
70 if current_timestamp_ms > self.last_timestamp_ms {
71 break;
72 }
73 current_timestamp_ms = self.get_current_timestamp_ms();
74
75 #[cfg(madsim)]
76 tokio::time::advance(std::time::Duration::from_micros(10));
77 #[cfg(not(madsim))]
78 std::hint::spin_loop();
79 }
80 self.last_timestamp_ms = current_timestamp_ms;
81 true
82 } else {
83 false
84 }
85 }
86
87 fn last_timestamp_ms(&self) -> i64 {
88 self.last_timestamp_ms
89 }
90}
91
92#[derive(Debug)]
101pub struct RowIdGenerator {
102 timestamp_mgr: TimestampManager,
104
105 vnode_bit: u32,
107
108 vnodes: Vec<VirtualNode>,
110
111 vnodes_index: u16,
113
114 sequence: u16,
116}
117
118pub type RowId = i64;
119
120fn bit_for_vnode(vnode_count: usize) -> u32 {
126 debug_assert!(
127 vnode_count <= VirtualNode::MAX_COUNT,
128 "invalid vnode count {vnode_count}"
129 );
130
131 if vnode_count <= 1 << COMPAT_VNODE_BITS {
132 COMPAT_VNODE_BITS
133 } else {
134 vnode_count.next_power_of_two().ilog2()
135 }
136}
137
138#[inline]
154pub fn compute_vnode_from_row_id(id: RowId, vnode_count: usize) -> VirtualNode {
155 let vnode_bit = bit_for_vnode(vnode_count);
156 let sequence_bit = TIMESTAMP_SHIFT_BITS - vnode_bit;
157
158 let vnode_part = ((id >> sequence_bit) & ((1 << vnode_bit) - 1)) as usize;
159
160 VirtualNode::from_index(vnode_part % vnode_count)
163}
164
165impl RowIdGenerator {
166 pub fn new(vnodes: impl IntoIterator<Item = VirtualNode>, vnode_count: usize) -> Self {
168 let vnode_bit = bit_for_vnode(vnode_count);
169
170 Self {
171 timestamp_mgr: TimestampManager::new(),
172 vnode_bit,
173 vnodes: vnodes.into_iter().collect(),
174 vnodes_index: 0,
175 sequence: 0,
176 }
177 }
178
179 fn sequence_upper_bound(&self) -> u16 {
181 1 << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)
182 }
183
184 fn try_update_timestamp(&mut self) {
190 let should_update = self.sequence == self.sequence_upper_bound();
191 if self.timestamp_mgr.try_update_timestamp(should_update) {
192 self.sequence = 0;
194 }
195 }
196
197 fn next_row_id_in_current_timestamp(&mut self) -> Option<RowId> {
201 if self.sequence >= self.sequence_upper_bound() {
202 return None;
203 }
204
205 let vnode = self.vnodes[self.vnodes_index as usize].to_index();
206 let sequence = self.sequence;
207
208 self.vnodes_index = (self.vnodes_index + 1) % self.vnodes.len() as u16;
209 if self.vnodes_index == 0 {
210 self.sequence += 1;
211 }
212
213 Some(
214 self.timestamp_mgr.last_timestamp_ms() << TIMESTAMP_SHIFT_BITS
215 | (vnode << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)) as i64
216 | sequence as i64,
217 )
218 }
219
220 fn gen_iter(&mut self) -> impl Iterator<Item = RowId> + '_ {
222 std::iter::from_fn(move || {
223 if let Some(next) = self.next_row_id_in_current_timestamp() {
224 Some(next)
225 } else {
226 self.try_update_timestamp();
227 Some(
228 self.next_row_id_in_current_timestamp()
229 .expect("timestamp should be updated"),
230 )
231 }
232 })
233 }
234
235 pub fn next_batch(&mut self, length: usize) -> Vec<RowId> {
241 self.try_update_timestamp();
242
243 let mut ret = Vec::with_capacity(length);
244 ret.extend(self.gen_iter().take(length));
245 assert_eq!(ret.len(), length);
246 ret
247 }
248
249 #[allow(clippy::should_implement_trait)]
253 pub fn next(&mut self) -> RowId {
254 self.try_update_timestamp();
255
256 self.gen_iter().next().unwrap()
257 }
258}
259
260#[derive(Debug)]
264pub struct ChangelogRowIdGenerator {
265 timestamp_mgr: TimestampManager,
267
268 vnode_bit: u32,
270
271 vnodes_sequence: HashMap<VirtualNode, u16>,
273
274 vnodes: Bitmap,
275}
276
277impl ChangelogRowIdGenerator {
278 pub fn new(vnodes: Bitmap, vnode_count: usize) -> Self {
280 let vnode_bit = bit_for_vnode(vnode_count);
281 let mut generator = Self {
282 timestamp_mgr: TimestampManager::new(),
283 vnode_bit,
284 vnodes_sequence: HashMap::default(),
285 vnodes,
286 };
287 generator.try_update_timestamp();
288 generator
289 }
290
291 fn sequence_upper_bound(&self) -> u16 {
293 1 << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)
294 }
295
296 fn try_update_timestamp(&mut self) {
297 if self.timestamp_mgr.try_update_timestamp(true) {
298 self.vnodes_sequence.clear();
300 }
301 }
302
303 fn next_changelog_row_id_in_current_timestamp(&mut self, vnode: &VirtualNode) -> Option<RowId> {
304 if !self.vnodes.is_set(vnode.to_index()) && *vnode != VirtualNode::ZERO {
305 panic!("vnode {:?} not in generator", vnode);
306 }
307 let current_sequence = *self.vnodes_sequence.get(vnode).unwrap_or(&1);
308
309 if current_sequence >= self.sequence_upper_bound() {
310 return None;
311 }
312
313 let sequence = current_sequence;
314 self.vnodes_sequence.insert(*vnode, current_sequence + 1);
315
316 Some(
317 self.timestamp_mgr.last_timestamp_ms() << TIMESTAMP_SHIFT_BITS
318 | (vnode.to_index() << (TIMESTAMP_SHIFT_BITS - self.vnode_bit)) as i64
319 | sequence as i64,
320 )
321 }
322
323 pub fn next(&mut self, vnode: &VirtualNode) -> RowId {
324 if let Some(row_id) = self.next_changelog_row_id_in_current_timestamp(vnode) {
325 row_id
326 } else {
327 self.try_update_timestamp();
328 self.next_changelog_row_id_in_current_timestamp(vnode)
329 .expect("timestamp should be updated")
330 }
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use std::time::Duration;
337
338 use itertools::Itertools;
339
340 use super::*;
341
342 #[allow(clippy::unused_async)] async fn test_generator_with_vnode_count(vnode_count: usize) {
344 let mut generator = RowIdGenerator::new([VirtualNode::from_index(0)], vnode_count);
345 let sequence_upper_bound = generator.sequence_upper_bound();
346
347 let mut last_row_id = generator.next();
348 for _ in 0..100000 {
349 let row_id = generator.next();
350 assert!(row_id > last_row_id);
351 last_row_id = row_id;
352 }
353
354 let dur = Duration::from_millis(10);
355 #[cfg(madsim)]
356 tokio::time::advance(dur);
357 #[cfg(not(madsim))]
358 std::thread::sleep(dur);
359
360 let row_id = generator.next();
361 assert!(row_id > last_row_id);
362 assert_ne!(
363 row_id >> TIMESTAMP_SHIFT_BITS,
364 last_row_id >> TIMESTAMP_SHIFT_BITS
365 );
366 assert_eq!(row_id & (sequence_upper_bound as i64 - 1), 0);
367
368 let mut generator = RowIdGenerator::new([VirtualNode::from_index(1)], vnode_count);
369 let row_ids = generator.next_batch((sequence_upper_bound + 10) as usize);
370 let mut expected = (0..sequence_upper_bound).collect_vec();
371 expected.extend(0..10);
372 assert_eq!(
373 row_ids
374 .into_iter()
375 .map(|id| (id as u16) & (sequence_upper_bound - 1))
376 .collect_vec(),
377 expected
378 );
379 }
380
381 #[allow(clippy::unused_async)] async fn test_generator_multiple_vnodes_with_vnode_count(vnode_count: usize) {
383 assert!(vnode_count >= 20);
384
385 let vnodes = || {
386 (0..10)
387 .chain((vnode_count - 10)..vnode_count)
388 .map(VirtualNode::from_index)
389 };
390 let vnode_of = |row_id: RowId| compute_vnode_from_row_id(row_id, vnode_count);
391
392 let mut generator = RowIdGenerator::new(vnodes(), vnode_count);
393 let sequence_upper_bound = generator.sequence_upper_bound();
394
395 let row_ids = generator.next_batch((sequence_upper_bound as usize) * 20 + 1);
396
397 let timestamps = row_ids
399 .iter()
400 .map(|&r| r >> TIMESTAMP_SHIFT_BITS)
401 .collect_vec();
402
403 let (last_timestamp, first_timestamps) = timestamps.split_last().unwrap();
404 let first_timestamp = first_timestamps.iter().unique().exactly_one().unwrap();
405
406 let expected_vnodes = vnodes().cycle();
408 let actual_vnodes = row_ids.iter().map(|&r| vnode_of(r));
409
410 #[expect(clippy::disallowed_methods)] for (expected, actual) in expected_vnodes.zip(actual_vnodes) {
412 assert_eq!(expected, actual);
413 }
414
415 assert!(last_timestamp > first_timestamp);
416 }
417
418 macro_rules! test {
419 ($vnode_count:expr, $name:ident, $name_mul:ident) => {
420 #[tokio::test]
421 async fn $name() {
422 test_generator_with_vnode_count($vnode_count).await;
423 }
424
425 #[tokio::test]
426 async fn $name_mul() {
427 test_generator_multiple_vnodes_with_vnode_count($vnode_count).await;
428 }
429 };
430 }
431
432 test!(64, test_64, test_64_mul); test!(114, test_114, test_114_mul); test!(256, test_256, test_256_mul); test!(1 << COMPAT_VNODE_BITS, test_1024, test_1024_mul); test!(2048, test_2048, test_2048_mul); test!(2333, test_2333, test_2333_mul); test!(VirtualNode::MAX_COUNT, test_max, test_max_mul); }