risingwave_meta/hummock/manager/
sequence.rs1use std::collections::HashMap;
16use std::fmt::Display;
17use std::future::Future;
18use std::sync::LazyLock;
19
20use parking_lot::Mutex as ParkingMutex;
21use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
22use risingwave_hummock_sdk::{CompactionGroupId, HummockRawObjectId, HummockSstableId};
23use risingwave_meta_model::hummock_sequence;
24use risingwave_meta_model::hummock_sequence::{
25 COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID,
26};
27use risingwave_meta_model::prelude::HummockSequence;
28use risingwave_pb::id::TypedId;
29use sea_orm::{ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait};
30use tokio::sync::Mutex;
31
32use crate::hummock::error::Result;
33use crate::manager::MetaSrvEnv;
34
35static SEQ_INIT: LazyLock<HashMap<String, i64>> = LazyLock::new(|| {
36 maplit::hashmap! {
37 COMPACTION_TASK_ID.into() => 1,
38 COMPACTION_GROUP_ID.into() => StaticCompactionGroupId::End.as_i64_id() + 1,
39 SSTABLE_OBJECT_ID.into() => 1,
40 META_BACKUP_ID.into() => 1,
41 }
42});
43
44#[derive(Default, Debug, PartialEq, Eq)]
49struct PrefetchedRange {
50 next: u64,
51 end: u64,
52}
53
54impl PrefetchedRange {
55 fn is_empty(&self) -> bool {
56 self.next >= self.end
57 }
58
59 fn pop(&mut self) -> Option<u64> {
60 if self.is_empty() {
61 return None;
62 }
63
64 let value = self.next;
65 self.next += 1;
66 Some(value)
67 }
68
69 fn reset(&mut self, start: u64, count: u32) {
70 self.next = start;
71 self.end = start + u64::from(count);
72 }
73
74 #[cfg(test)]
75 fn set_bounds(&mut self, next: u64, end: u64) {
76 self.next = next;
77 self.end = end;
78 }
79
80 #[cfg(test)]
81 fn bounds(&self) -> (u64, u64) {
82 (self.next, self.end)
83 }
84}
85
86#[derive(Default, Debug)]
89pub(crate) struct PrefetchedSequence {
90 range: ParkingMutex<PrefetchedRange>,
91 refill_lock: Mutex<()>,
92}
93
94impl PrefetchedSequence {
95 pub(crate) fn new() -> Self {
96 Self::default()
97 }
98
99 pub(crate) async fn next<F, Fut>(&self, refill_count: u32, refill: F) -> Result<u64>
111 where
112 F: FnOnce(u32) -> Fut,
113 Fut: Future<Output = Result<u64>>,
114 {
115 let refill_count = refill_count.max(1);
117 if let Some(value) = self.try_pop() {
118 return Ok(value);
119 }
120
121 let _guard = self.refill_lock.lock().await;
122 if let Some(value) = self.try_pop() {
123 return Ok(value);
124 }
125
126 let start = refill(refill_count).await?;
127 let mut range = self.range.lock();
128 debug_assert!(range.is_empty());
129 range.reset(start, refill_count);
130 Ok(range
131 .pop()
132 .expect("a freshly refilled sequence must return one value"))
133 }
134
135 fn try_pop(&self) -> Option<u64> {
136 self.range.lock().pop()
137 }
138
139 #[cfg(test)]
140 pub(crate) fn set_test_bounds(&self, next: u64, end: u64) {
141 self.range.lock().set_bounds(next, end);
142 }
143
144 #[cfg(test)]
145 pub(crate) fn test_bounds(&self) -> (u64, u64) {
146 self.range.lock().bounds()
147 }
148}
149
150pub struct SequenceGenerator {
151 db: Mutex<DatabaseConnection>,
152}
153
154impl SequenceGenerator {
155 pub fn new(db: DatabaseConnection) -> Self {
156 Self { db: Mutex::new(db) }
157 }
158
159 pub async fn next_interval(&self, ident: &str, num: u32) -> Result<u64> {
165 let guard = self.db.lock().await;
167 let txn = guard.begin().await?;
168 let model: Option<hummock_sequence::Model> =
169 hummock_sequence::Entity::find_by_id(ident.to_owned())
170 .one(&txn)
171 .await?;
172 let start_seq = match model {
173 None => {
174 let init: u64 = SEQ_INIT
175 .get(ident)
176 .copied()
177 .unwrap_or_else(|| panic!("seq {ident} not found"))
178 as u64;
179 let active_model = hummock_sequence::ActiveModel {
180 name: ActiveValue::set(ident.into()),
181 seq: ActiveValue::set(init.checked_add(num as _).unwrap().try_into().unwrap()),
182 };
183 HummockSequence::insert(active_model).exec(&txn).await?;
184 init
185 }
186 Some(model) => {
187 let start_seq: u64 = model.seq as u64;
188 if num > 0 {
189 let mut active_model: hummock_sequence::ActiveModel = model.into();
190 active_model.seq = ActiveValue::set(
191 start_seq.checked_add(num as _).unwrap().try_into().unwrap(),
192 );
193 HummockSequence::update(active_model).exec(&txn).await?;
194 }
195 start_seq
196 }
197 };
198 if num > 0 {
199 txn.commit().await?;
200 }
201 Ok(start_seq)
202 }
203}
204
205pub async fn next_compaction_task_id(env: &MetaSrvEnv) -> Result<u64> {
206 env.hummock_seq.next_interval(COMPACTION_TASK_ID, 1).await
207}
208
209pub async fn next_meta_backup_id(env: &MetaSrvEnv) -> Result<u64> {
210 env.hummock_seq.next_interval(META_BACKUP_ID, 1).await
211}
212
213pub async fn next_compaction_group_id(env: &MetaSrvEnv) -> Result<CompactionGroupId> {
214 Ok(env
215 .hummock_seq
216 .next_interval(COMPACTION_GROUP_ID, 1)
217 .await?
218 .into())
219}
220
221pub async fn next_sstable_id(
222 env: &MetaSrvEnv,
223 num: impl TryInto<u32> + Display + Copy,
224) -> Result<HummockSstableId> {
225 next_unique_id(env, num).await
226}
227
228pub async fn next_raw_object_id(
229 env: &MetaSrvEnv,
230 num: impl TryInto<u32> + Display + Copy,
231) -> Result<HummockRawObjectId> {
232 next_unique_id(env, num).await
233}
234
235async fn next_unique_id<const C: usize>(
236 env: &MetaSrvEnv,
237 num: impl TryInto<u32> + Display + Copy,
238) -> Result<TypedId<C, u64>> {
239 let num: u32 = num
240 .try_into()
241 .unwrap_or_else(|_| panic!("fail to convert {num} into u32"));
242 env.hummock_seq
243 .next_interval(SSTABLE_OBJECT_ID, num)
244 .await
245 .map(Into::into)
246}
247
248#[cfg(test)]
249mod tests {
250 use std::collections::HashSet;
251 use std::sync::Arc;
252 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
253
254 use itertools::Itertools;
255
256 use crate::controller::SqlMetaStore;
257 use crate::hummock::manager::sequence::{
258 COMPACTION_TASK_ID, PrefetchedSequence, SequenceGenerator,
259 };
260
261 #[cfg(not(madsim))]
262 #[tokio::test]
263 async fn test_seq_gen() {
264 let store = SqlMetaStore::for_test().await;
265 let conn = store.conn.clone();
266 let s = SequenceGenerator::new(conn);
267 assert_eq!(1, s.next_interval(COMPACTION_TASK_ID, 1).await.unwrap());
268 assert_eq!(2, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
269 assert_eq!(12, s.next_interval(COMPACTION_TASK_ID, 10).await.unwrap());
270 }
271
272 #[test]
273 fn test_prefetched_sequence_reuses_cached_range() {
274 let seq = PrefetchedSequence::new();
275 seq.set_test_bounds(100, 103);
276
277 assert_eq!(
278 futures::executor::block_on(seq.next(16, |_| async {
279 panic!("refill should not be called when cached values exist")
280 }))
281 .unwrap(),
282 100
283 );
284 assert_eq!(
285 futures::executor::block_on(seq.next(16, |_| async {
286 panic!("refill should not be called when cached values exist")
287 }))
288 .unwrap(),
289 101
290 );
291 assert_eq!(
292 futures::executor::block_on(seq.next(16, |_| async {
293 panic!("refill should not be called when cached values exist")
294 }))
295 .unwrap(),
296 102
297 );
298 assert_eq!(seq.test_bounds(), (103, 103));
299 }
300
301 #[tokio::test]
302 async fn test_prefetched_sequence_refills_in_batch_when_empty() {
303 let seq = PrefetchedSequence::new();
304 let next_start = Arc::new(AtomicU64::new(100));
305
306 let first = seq
307 .next(4, {
308 let next_start = next_start.clone();
309 move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
310 })
311 .await
312 .unwrap();
313 assert_eq!(first, 100);
314 assert_eq!(seq.test_bounds(), (101, 104));
315
316 assert_eq!(
317 seq.next(4, {
318 let next_start = next_start.clone();
319 move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
320 })
321 .await
322 .unwrap(),
323 101
324 );
325 assert_eq!(
326 seq.next(4, {
327 let next_start = next_start.clone();
328 move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
329 })
330 .await
331 .unwrap(),
332 102
333 );
334 assert_eq!(
335 seq.next(4, {
336 let next_start = next_start.clone();
337 move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
338 })
339 .await
340 .unwrap(),
341 103
342 );
343 assert_eq!(
344 seq.next(4, {
345 let next_start = next_start.clone();
346 move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
347 })
348 .await
349 .unwrap(),
350 104
351 );
352 assert_eq!(seq.test_bounds(), (105, 108));
353 }
354
355 #[tokio::test]
356 async fn test_prefetched_sequence_respects_refill_capacity() {
357 let seq = PrefetchedSequence::new();
358 let next_start = Arc::new(AtomicU64::new(100));
359
360 let first = seq
361 .next(2, {
362 let next_start = next_start.clone();
363 move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
364 })
365 .await
366 .unwrap();
367 assert_eq!(first, 100);
368 assert_eq!(
369 seq.next(2, {
370 let next_start = next_start.clone();
371 move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
372 })
373 .await
374 .unwrap(),
375 101
376 );
377 assert_eq!(seq.test_bounds(), (102, 102));
378
379 assert_eq!(
380 seq.next(1, {
381 let next_start = next_start.clone();
382 move |count| async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
383 })
384 .await
385 .unwrap(),
386 102
387 );
388 assert_eq!(seq.test_bounds(), (103, 103));
389 }
390
391 #[test]
392 fn test_prefetched_sequence_pop_does_not_overflow_bounds() {
393 let seq = PrefetchedSequence::new();
394 seq.set_test_bounds(10, 13);
395
396 assert_eq!(
397 futures::executor::block_on(seq.next(4, |_| async {
398 panic!("refill should not be called when cached values exist")
399 }))
400 .unwrap(),
401 10
402 );
403 assert_eq!(
404 futures::executor::block_on(seq.next(4, |_| async {
405 panic!("refill should not be called when cached values exist")
406 }))
407 .unwrap(),
408 11
409 );
410 assert_eq!(
411 futures::executor::block_on(seq.next(4, |_| async {
412 panic!("refill should not be called when cached values exist")
413 }))
414 .unwrap(),
415 12
416 );
417 assert_eq!(seq.test_bounds(), (13, 13));
418 }
419
420 #[tokio::test]
421 async fn test_prefetched_sequence_shares_refill_without_gaps() {
422 let seq = Arc::new(PrefetchedSequence::new());
423 let next_start = Arc::new(AtomicU64::new(100));
424 let refill_calls = Arc::new(AtomicUsize::new(0));
425
426 let mut handles = Vec::with_capacity(64);
427 for _ in 0..64 {
428 let seq = seq.clone();
429 let next_start = next_start.clone();
430 let refill_calls = refill_calls.clone();
431 handles.push(tokio::spawn(async move {
432 seq.next(4, move |count| {
433 refill_calls.fetch_add(1, Ordering::SeqCst);
434 async move { Ok(next_start.fetch_add(u64::from(count), Ordering::SeqCst)) }
435 })
436 .await
437 .unwrap()
438 }));
439 }
440
441 let mut ids = Vec::with_capacity(64);
442 for handle in handles {
443 ids.push(handle.await.unwrap());
444 }
445
446 assert_eq!(ids.len(), 64);
447 assert_eq!(ids.iter().copied().collect::<HashSet<_>>().len(), 64);
448
449 let mut sorted_ids = ids.iter().copied().collect_vec();
450 sorted_ids.sort_unstable();
451 assert_eq!(sorted_ids.first().copied(), Some(100));
452 assert!(
453 sorted_ids
454 .windows(2)
455 .all(|window| window[1] == window[0] + 1),
456 "ids should form a contiguous range: {:?}",
457 sorted_ids
458 );
459 assert_eq!(refill_calls.load(Ordering::SeqCst), 16);
460 assert_eq!(seq.test_bounds(), (164, 164));
461 }
462
463 #[tokio::test]
464 async fn test_prefetched_sequence_propagates_refill_error() {
465 use crate::hummock::error::Error;
466
467 let seq = PrefetchedSequence::new();
468
469 let err = seq
471 .next(4, |_count| async {
472 Err(Error::Internal(anyhow::anyhow!("db failure")))
473 })
474 .await;
475 assert!(err.is_err(), "refill error should propagate to caller");
476
477 assert_eq!(seq.test_bounds(), (0, 0));
480
481 let val = seq
483 .next(4, |_count| async move { Ok(200u64) })
484 .await
485 .unwrap();
486 assert_eq!(val, 200);
487 assert_eq!(seq.test_bounds(), (201, 204));
488 }
489}