1pub mod hummock_version_ext;
16
17use risingwave_common::catalog::TableId;
18
19pub type StateTableId = TableId;
20
21#[expect(non_upper_case_globals)]
24#[expect(non_snake_case)]
25pub mod StaticCompactionGroupId {
26 use risingwave_pb::id::CompactionGroupId;
27
28 pub const NewCompactionGroup: CompactionGroupId = CompactionGroupId::new(0);
30 pub const SharedBuffer: CompactionGroupId = CompactionGroupId::new(1);
34 pub const StateDefault: CompactionGroupId = CompactionGroupId::new(2);
36 pub const MaterializedView: CompactionGroupId = CompactionGroupId::new(3);
38 pub const End: CompactionGroupId = CompactionGroupId::new(4);
40}
41
42pub mod group_split {
47 use std::cmp::Ordering;
48 use std::collections::BTreeSet;
49
50 use bytes::Bytes;
51 use risingwave_common::catalog::TableId;
52 use risingwave_common::hash::VirtualNode;
53 use risingwave_pb::hummock::PbLevelType;
54
55 use super::StateTableId;
56 use super::hummock_version_ext::insert_new_sub_level;
57 use crate::key::{FullKey, TableKey};
58 use crate::key_range::KeyRange;
59 use crate::level::{Level, Levels};
60 use crate::sstable_info::SstableInfo;
61 use crate::{HummockEpoch, HummockSstableId, KeyComparator, can_concat};
62
63 pub fn build_split_key(table_id: StateTableId, vnode: VirtualNode) -> Bytes {
65 build_split_full_key(table_id, vnode).encode().into()
66 }
67
68 pub fn build_split_full_key(
70 mut table_id: StateTableId,
71 mut vnode: VirtualNode,
72 ) -> FullKey<Vec<u8>> {
73 if VirtualNode::MAX_REPRESENTABLE == vnode {
74 table_id = table_id.as_raw_id().strict_add(1).into();
76 vnode = VirtualNode::ZERO;
77 }
78
79 FullKey::new(
80 table_id,
81 TableKey(vnode.to_be_bytes().to_vec()),
82 HummockEpoch::MAX,
83 )
84 }
85
86 #[derive(Debug, PartialEq, Clone)]
87 pub enum SstSplitType {
88 Left,
89 Right,
90 Both,
91 }
92
93 pub fn need_to_split(sst: &SstableInfo, split_key: Bytes) -> SstSplitType {
95 let key_range = &sst.key_range;
96 if KeyComparator::compare_encoded_full_key(&split_key, &key_range.left).is_le() {
98 return SstSplitType::Right;
99 }
100
101 if key_range.right_exclusive {
103 if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_ge() {
104 return SstSplitType::Left;
105 }
106 } else if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_gt() {
107 return SstSplitType::Left;
108 }
109
110 SstSplitType::Both
111 }
112
113 pub fn split_sst(
133 origin_sst_info: SstableInfo,
134 new_sst_id: &mut HummockSstableId,
135 split_key: Bytes,
136 left_size: u64,
137 right_size: u64,
138 ) -> (Option<SstableInfo>, Option<SstableInfo>) {
139 let mut origin_sst_info = origin_sst_info.get_inner();
140 let mut branch_table_info = origin_sst_info.clone();
141 branch_table_info.sst_id = *new_sst_id;
142 *new_sst_id += 1;
143 origin_sst_info.sst_id = *new_sst_id;
144 *new_sst_id += 1;
145
146 let (key_range_l, key_range_r) = {
147 let key_range = &origin_sst_info.key_range;
148 let l = KeyRange {
149 left: key_range.left.clone(),
150 right: split_key.clone(),
151 right_exclusive: true,
152 };
153
154 let r = KeyRange {
155 left: split_key.clone(),
156 right: key_range.right.clone(),
157 right_exclusive: key_range.right_exclusive,
158 };
159
160 (l, r)
161 };
162 let (table_ids_l, table_ids_r) =
163 split_table_ids_with_split_key(&origin_sst_info.table_ids, split_key);
164
165 {
167 origin_sst_info.key_range = key_range_l;
169 origin_sst_info.sst_size = std::cmp::max(1, left_size);
170 origin_sst_info.table_ids = table_ids_l;
171 }
172
173 {
174 branch_table_info.key_range = key_range_r;
176 branch_table_info.sst_size = std::cmp::max(1, right_size);
177 branch_table_info.table_ids = table_ids_r;
178 }
179
180 if origin_sst_info.table_ids.is_empty() {
182 (None, Some(branch_table_info.into()))
183 } else if branch_table_info.table_ids.is_empty() {
184 (Some(origin_sst_info.into()), None)
185 } else if KeyComparator::compare_encoded_full_key(
186 &origin_sst_info.key_range.left,
187 &origin_sst_info.key_range.right,
188 )
189 .is_eq()
190 {
191 (None, Some(branch_table_info.into()))
193 } else {
194 (Some(origin_sst_info.into()), Some(branch_table_info.into()))
195 }
196 }
197
198 pub fn split_sst_with_table_ids(
202 origin_sst_info: &SstableInfo,
203 new_sst_id: &mut HummockSstableId,
204 old_sst_size: u64,
205 new_sst_size: u64,
206 new_table_ids: Vec<TableId>,
207 ) -> (SstableInfo, SstableInfo) {
208 let mut sst_info = origin_sst_info.get_inner();
209 let mut branch_table_info = sst_info.clone();
210 branch_table_info.sst_id = *new_sst_id;
211 branch_table_info.sst_size = std::cmp::max(1, new_sst_size);
212 *new_sst_id += 1;
213
214 sst_info.sst_id = *new_sst_id;
215 sst_info.sst_size = std::cmp::max(1, old_sst_size);
216 *new_sst_id += 1;
217
218 {
219 let set1: BTreeSet<_> = sst_info.table_ids.iter().cloned().collect();
225 let set2: BTreeSet<_> = new_table_ids.into_iter().collect();
226 let intersection: Vec<_> = set1.intersection(&set2).cloned().collect();
227
228 branch_table_info.table_ids = intersection;
230 sst_info
231 .table_ids
232 .retain(|table_id| !branch_table_info.table_ids.contains(table_id));
233 }
234
235 (sst_info.into(), branch_table_info.into())
236 }
237
238 pub fn split_table_ids_with_split_key(
240 table_ids: &Vec<TableId>,
241 split_key: Bytes,
242 ) -> (Vec<TableId>, Vec<TableId>) {
243 assert!(table_ids.is_sorted());
244 let split_full_key = FullKey::decode(&split_key);
245 let split_user_key = split_full_key.user_key;
246 let vnode = split_user_key.get_vnode_id();
247 let table_id = split_user_key.table_id;
248 split_table_ids_with_table_id_and_vnode(table_ids, table_id, vnode)
249 }
250
251 pub fn split_table_ids_with_table_id_and_vnode(
252 table_ids: &Vec<TableId>,
253 table_id: StateTableId,
254 vnode: usize,
255 ) -> (Vec<TableId>, Vec<TableId>) {
256 assert!(table_ids.is_sorted());
257 assert_eq!(VirtualNode::ZERO, VirtualNode::from_index(vnode));
258 let pos = table_ids.partition_point(|&id| id < table_id);
259 (table_ids[..pos].to_vec(), table_ids[pos..].to_vec())
260 }
261
262 pub fn get_split_pos(sstables: &Vec<SstableInfo>, split_key: Bytes) -> usize {
263 sstables
264 .partition_point(|sst| {
265 KeyComparator::compare_encoded_full_key(&sst.key_range.left, &split_key).is_lt()
266 })
267 .saturating_sub(1)
268 }
269
270 pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) {
272 let right_l0 = right_levels.l0;
273
274 let mut max_left_sub_level_id = left_levels
275 .l0
276 .sub_levels
277 .iter()
278 .map(|sub_level| sub_level.sub_level_id + 1)
279 .max()
280 .unwrap_or(0); let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0;
282
283 for mut right_sub_level in right_l0.sub_levels {
284 if need_rewrite_right_sub_level_id {
287 right_sub_level.sub_level_id = max_left_sub_level_id;
288 max_left_sub_level_id += 1;
289 }
290
291 insert_new_sub_level(
292 &mut left_levels.l0,
293 right_sub_level.sub_level_id,
294 right_sub_level.level_type,
295 right_sub_level.table_infos,
296 None,
297 );
298 }
299
300 assert!(
301 left_levels
302 .l0
303 .sub_levels
304 .is_sorted_by_key(|sub_level| sub_level.sub_level_id),
305 "{}",
306 format!("left_levels.l0.sub_levels: {:?}", left_levels.l0.sub_levels)
307 );
308
309 left_levels
312 .l0
313 .sub_levels
314 .iter_mut()
315 .for_each(|sub_level| sub_level.vnode_partition_count = 0);
316
317 for (idx, level) in right_levels.levels.into_iter().enumerate() {
318 if level.table_infos.is_empty() {
319 continue;
320 }
321
322 let insert_table_infos = level.table_infos;
323 left_levels.levels[idx].total_file_size += insert_table_infos
324 .iter()
325 .map(|sst| sst.sst_size)
326 .sum::<u64>();
327 left_levels.levels[idx].uncompressed_file_size += insert_table_infos
328 .iter()
329 .map(|sst| sst.uncompressed_file_size)
330 .sum::<u64>();
331
332 left_levels.levels[idx]
333 .table_infos
334 .extend(insert_table_infos);
335 left_levels.levels[idx]
336 .table_infos
337 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
338 assert!(
339 can_concat(&left_levels.levels[idx].table_infos),
340 "{}",
341 format!(
342 "left-group {} right-group {} left_levels.levels[{}].table_infos: {:?} level_idx {:?}",
343 left_levels.group_id,
344 right_levels.group_id,
345 idx,
346 left_levels.levels[idx].table_infos,
347 left_levels.levels[idx].level_idx
348 )
349 );
350 }
351 }
352
353 pub fn get_sub_level_insert_hint(
357 target_levels: &Vec<Level>,
358 sub_level: &Level,
359 ) -> Result<usize, usize> {
360 for (idx, other) in target_levels.iter().enumerate() {
361 match other.sub_level_id.cmp(&sub_level.sub_level_id) {
362 Ordering::Less => {}
363 Ordering::Equal => {
364 return Ok(idx);
365 }
366 Ordering::Greater => {
367 return Err(idx);
368 }
369 }
370 }
371
372 Err(target_levels.len())
373 }
374
375 pub fn split_sst_info_for_level_v2(
377 level: &mut Level,
378 new_sst_id: &mut HummockSstableId,
379 split_key: Bytes,
380 ) -> Vec<SstableInfo> {
381 if level.table_infos.is_empty() {
382 return vec![];
383 }
384
385 let mut insert_table_infos = vec![];
386 if level.level_type == PbLevelType::Overlapping {
387 level.table_infos.retain_mut(|sst| {
388 let sst_split_type = need_to_split(sst, split_key.clone());
389 match sst_split_type {
390 SstSplitType::Left => true,
391 SstSplitType::Right => {
392 insert_table_infos.push(sst.clone());
393 false
394 }
395 SstSplitType::Both => {
396 let sst_size = sst.sst_size;
397 if sst_size / 2 == 0 {
398 tracing::warn!(
399 id = %sst.sst_id,
400 object_id = %sst.object_id,
401 sst_size = sst.sst_size,
402 file_size = sst.file_size,
403 "Sstable sst_size is under expected",
404 );
405 };
406
407 let (left, right) = split_sst(
408 sst.clone(),
409 new_sst_id,
410 split_key.clone(),
411 sst_size / 2,
412 sst_size / 2,
413 );
414 if let Some(branch_sst) = right {
415 insert_table_infos.push(branch_sst);
416 }
417
418 if let Some(s) = left {
419 *sst = s;
420 true
421 } else {
422 false
423 }
424 }
425 }
426 });
427 } else {
428 let pos = get_split_pos(&level.table_infos, split_key.clone());
429 if pos >= level.table_infos.len() {
430 return insert_table_infos;
431 }
432
433 let sst_split_type = need_to_split(&level.table_infos[pos], split_key.clone());
434 match sst_split_type {
435 SstSplitType::Left => {
436 insert_table_infos.extend_from_slice(&level.table_infos[pos + 1..]);
437 level.table_infos = level.table_infos[0..=pos].to_vec();
438 }
439 SstSplitType::Right => {
440 assert_eq!(0, pos);
441 insert_table_infos.extend_from_slice(&level.table_infos[pos..]); level.table_infos.clear();
443 }
444 SstSplitType::Both => {
445 let sst = level.table_infos[pos].clone();
447 let sst_size = sst.sst_size;
448 if sst_size / 2 == 0 {
449 tracing::warn!(
450 id = %sst.sst_id,
451 object_id = %sst.object_id,
452 sst_size = sst.sst_size,
453 file_size = sst.file_size,
454 "Sstable sst_size is under expected",
455 );
456 };
457
458 let (left, right) = split_sst(
459 sst,
460 new_sst_id,
461 split_key.clone(),
462 sst_size / 2,
463 sst_size / 2,
464 );
465
466 if let Some(branch_sst) = right {
467 insert_table_infos.push(branch_sst);
468 }
469
470 let right_start = pos + 1;
471 let left_end = pos;
472 insert_table_infos.extend_from_slice(&level.table_infos[right_start..]);
475 level.table_infos = level.table_infos[0..=left_end].to_vec();
476 if let Some(origin_sst) = left {
477 level.table_infos[left_end] = origin_sst;
479 } else {
480 level.table_infos.pop();
481 }
482 }
483 };
484 }
485
486 insert_table_infos
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use itertools::Itertools;
493 use risingwave_common::catalog::TableId;
494 use risingwave_common::hash::VirtualNode;
495
496 #[test]
497 fn test_split_table_ids() {
498 let table_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9]
499 .into_iter()
500 .map(Into::<TableId>::into)
501 .collect();
502 let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
503 &table_ids,
504 5.into(),
505 VirtualNode::ZERO.to_index(),
506 );
507 assert_eq!(
508 left,
509 [1, 2, 3, 4]
510 .into_iter()
511 .map(Into::<TableId>::into)
512 .collect_vec()
513 );
514 assert_eq!(
515 right,
516 [5, 6, 7, 8, 9]
517 .into_iter()
518 .map(Into::<TableId>::into)
519 .collect_vec()
520 );
521
522 let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
525 &table_ids,
526 10.into(),
527 VirtualNode::ZERO.to_index(),
528 );
529 assert_eq!(
530 left,
531 [1, 2, 3, 4, 5, 6, 7, 8, 9]
532 .into_iter()
533 .map(Into::<TableId>::into)
534 .collect_vec()
535 );
536 assert!(right.is_empty());
537
538 let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
539 &table_ids,
540 0.into(),
541 VirtualNode::ZERO.to_index(),
542 );
543
544 assert!(left.is_empty());
545 assert_eq!(
546 right,
547 [1, 2, 3, 4, 5, 6, 7, 8, 9]
548 .into_iter()
549 .map(Into::<TableId>::into)
550 .collect_vec()
551 );
552 }
553
554 #[test]
555 fn test_split_table_ids_with_split_key() {
556 let table_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9]
557 .into_iter()
558 .map(Into::<TableId>::into)
559 .collect();
560 let split_key = super::group_split::build_split_key(5.into(), VirtualNode::ZERO);
561 let (left, right) =
562 super::group_split::split_table_ids_with_split_key(&table_ids, split_key);
563 assert_eq!(
564 left,
565 [1, 2, 3, 4]
566 .into_iter()
567 .map(Into::<TableId>::into)
568 .collect_vec()
569 );
570 assert_eq!(
571 right,
572 [5, 6, 7, 8, 9]
573 .into_iter()
574 .map(Into::<TableId>::into)
575 .collect_vec()
576 );
577 }
578}