1pub mod hummock_version_ext;
16
17use parse_display::Display;
18use risingwave_common::catalog::TableId;
19
20use crate::CompactionGroupId;
21
22pub type StateTableId = TableId;
23
24#[derive(Display)]
27pub enum StaticCompactionGroupId {
28 NewCompactionGroup = 0,
30 SharedBuffer = 1,
34 StateDefault = 2,
36 MaterializedView = 3,
38 End = 4,
40}
41
42impl From<StaticCompactionGroupId> for CompactionGroupId {
43 fn from(cg: StaticCompactionGroupId) -> Self {
44 cg as CompactionGroupId
45 }
46}
47
48pub mod group_split {
53 use std::cmp::Ordering;
54 use std::collections::BTreeSet;
55
56 use bytes::Bytes;
57 use risingwave_common::catalog::TableId;
58 use risingwave_common::hash::VirtualNode;
59 use risingwave_pb::hummock::PbLevelType;
60
61 use super::StateTableId;
62 use super::hummock_version_ext::insert_new_sub_level;
63 use crate::key::{FullKey, TableKey};
64 use crate::key_range::KeyRange;
65 use crate::level::{Level, Levels};
66 use crate::sstable_info::SstableInfo;
67 use crate::{HummockEpoch, HummockSstableId, KeyComparator, can_concat};
68
69 pub fn build_split_key(table_id: StateTableId, vnode: VirtualNode) -> Bytes {
71 build_split_full_key(table_id, vnode).encode().into()
72 }
73
74 pub fn build_split_full_key(
76 mut table_id: StateTableId,
77 mut vnode: VirtualNode,
78 ) -> FullKey<Vec<u8>> {
79 if VirtualNode::MAX_REPRESENTABLE == vnode {
80 table_id = table_id.as_raw_id().strict_add(1).into();
82 vnode = VirtualNode::ZERO;
83 }
84
85 FullKey::new(
86 table_id,
87 TableKey(vnode.to_be_bytes().to_vec()),
88 HummockEpoch::MAX,
89 )
90 }
91
92 #[derive(Debug, PartialEq, Clone)]
93 pub enum SstSplitType {
94 Left,
95 Right,
96 Both,
97 }
98
99 pub fn need_to_split(sst: &SstableInfo, split_key: Bytes) -> SstSplitType {
101 let key_range = &sst.key_range;
102 if KeyComparator::compare_encoded_full_key(&split_key, &key_range.left).is_le() {
104 return SstSplitType::Right;
105 }
106
107 if key_range.right_exclusive {
109 if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_ge() {
110 return SstSplitType::Left;
111 }
112 } else if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_gt() {
113 return SstSplitType::Left;
114 }
115
116 SstSplitType::Both
117 }
118
119 pub fn split_sst(
139 origin_sst_info: SstableInfo,
140 new_sst_id: &mut HummockSstableId,
141 split_key: Bytes,
142 left_size: u64,
143 right_size: u64,
144 ) -> (Option<SstableInfo>, Option<SstableInfo>) {
145 let mut origin_sst_info = origin_sst_info.get_inner();
146 let mut branch_table_info = origin_sst_info.clone();
147 branch_table_info.sst_id = *new_sst_id;
148 *new_sst_id += 1;
149 origin_sst_info.sst_id = *new_sst_id;
150 *new_sst_id += 1;
151
152 let (key_range_l, key_range_r) = {
153 let key_range = &origin_sst_info.key_range;
154 let l = KeyRange {
155 left: key_range.left.clone(),
156 right: split_key.clone(),
157 right_exclusive: true,
158 };
159
160 let r = KeyRange {
161 left: split_key.clone(),
162 right: key_range.right.clone(),
163 right_exclusive: key_range.right_exclusive,
164 };
165
166 (l, r)
167 };
168 let (table_ids_l, table_ids_r) =
169 split_table_ids_with_split_key(&origin_sst_info.table_ids, split_key);
170
171 {
173 origin_sst_info.key_range = key_range_l;
175 origin_sst_info.sst_size = std::cmp::max(1, left_size);
176 origin_sst_info.table_ids = table_ids_l;
177 }
178
179 {
180 branch_table_info.key_range = key_range_r;
182 branch_table_info.sst_size = std::cmp::max(1, right_size);
183 branch_table_info.table_ids = table_ids_r;
184 }
185
186 if origin_sst_info.table_ids.is_empty() {
188 (None, Some(branch_table_info.into()))
189 } else if branch_table_info.table_ids.is_empty() {
190 (Some(origin_sst_info.into()), None)
191 } else if KeyComparator::compare_encoded_full_key(
192 &origin_sst_info.key_range.left,
193 &origin_sst_info.key_range.right,
194 )
195 .is_eq()
196 {
197 (None, Some(branch_table_info.into()))
199 } else {
200 (Some(origin_sst_info.into()), Some(branch_table_info.into()))
201 }
202 }
203
204 pub fn split_sst_with_table_ids(
208 origin_sst_info: &SstableInfo,
209 new_sst_id: &mut HummockSstableId,
210 old_sst_size: u64,
211 new_sst_size: u64,
212 new_table_ids: Vec<TableId>,
213 ) -> (SstableInfo, SstableInfo) {
214 let mut sst_info = origin_sst_info.get_inner();
215 let mut branch_table_info = sst_info.clone();
216 branch_table_info.sst_id = *new_sst_id;
217 branch_table_info.sst_size = std::cmp::max(1, new_sst_size);
218 *new_sst_id += 1;
219
220 sst_info.sst_id = *new_sst_id;
221 sst_info.sst_size = std::cmp::max(1, old_sst_size);
222 *new_sst_id += 1;
223
224 {
225 let set1: BTreeSet<_> = sst_info.table_ids.iter().cloned().collect();
231 let set2: BTreeSet<_> = new_table_ids.into_iter().collect();
232 let intersection: Vec<_> = set1.intersection(&set2).cloned().collect();
233
234 branch_table_info.table_ids = intersection;
236 sst_info
237 .table_ids
238 .retain(|table_id| !branch_table_info.table_ids.contains(table_id));
239 }
240
241 (sst_info.into(), branch_table_info.into())
242 }
243
244 pub fn split_table_ids_with_split_key(
246 table_ids: &Vec<TableId>,
247 split_key: Bytes,
248 ) -> (Vec<TableId>, Vec<TableId>) {
249 assert!(table_ids.is_sorted());
250 let split_full_key = FullKey::decode(&split_key);
251 let split_user_key = split_full_key.user_key;
252 let vnode = split_user_key.get_vnode_id();
253 let table_id = split_user_key.table_id;
254 split_table_ids_with_table_id_and_vnode(table_ids, table_id, vnode)
255 }
256
257 pub fn split_table_ids_with_table_id_and_vnode(
258 table_ids: &Vec<TableId>,
259 table_id: StateTableId,
260 vnode: usize,
261 ) -> (Vec<TableId>, Vec<TableId>) {
262 assert!(table_ids.is_sorted());
263 assert_eq!(VirtualNode::ZERO, VirtualNode::from_index(vnode));
264 let pos = table_ids.partition_point(|&id| id < table_id);
265 (table_ids[..pos].to_vec(), table_ids[pos..].to_vec())
266 }
267
268 pub fn get_split_pos(sstables: &Vec<SstableInfo>, split_key: Bytes) -> usize {
269 sstables
270 .partition_point(|sst| {
271 KeyComparator::compare_encoded_full_key(&sst.key_range.left, &split_key).is_lt()
272 })
273 .saturating_sub(1)
274 }
275
276 pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) {
278 let right_l0 = right_levels.l0;
279
280 let mut max_left_sub_level_id = left_levels
281 .l0
282 .sub_levels
283 .iter()
284 .map(|sub_level| sub_level.sub_level_id + 1)
285 .max()
286 .unwrap_or(0); let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0;
288
289 for mut right_sub_level in right_l0.sub_levels {
290 if need_rewrite_right_sub_level_id {
293 right_sub_level.sub_level_id = max_left_sub_level_id;
294 max_left_sub_level_id += 1;
295 }
296
297 insert_new_sub_level(
298 &mut left_levels.l0,
299 right_sub_level.sub_level_id,
300 right_sub_level.level_type,
301 right_sub_level.table_infos,
302 None,
303 );
304 }
305
306 assert!(
307 left_levels
308 .l0
309 .sub_levels
310 .is_sorted_by_key(|sub_level| sub_level.sub_level_id),
311 "{}",
312 format!("left_levels.l0.sub_levels: {:?}", left_levels.l0.sub_levels)
313 );
314
315 left_levels
318 .l0
319 .sub_levels
320 .iter_mut()
321 .for_each(|sub_level| sub_level.vnode_partition_count = 0);
322
323 for (idx, level) in right_levels.levels.into_iter().enumerate() {
324 if level.table_infos.is_empty() {
325 continue;
326 }
327
328 let insert_table_infos = level.table_infos;
329 left_levels.levels[idx].total_file_size += insert_table_infos
330 .iter()
331 .map(|sst| sst.sst_size)
332 .sum::<u64>();
333 left_levels.levels[idx].uncompressed_file_size += insert_table_infos
334 .iter()
335 .map(|sst| sst.uncompressed_file_size)
336 .sum::<u64>();
337
338 left_levels.levels[idx]
339 .table_infos
340 .extend(insert_table_infos);
341 left_levels.levels[idx]
342 .table_infos
343 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
344 assert!(
345 can_concat(&left_levels.levels[idx].table_infos),
346 "{}",
347 format!(
348 "left-group {} right-group {} left_levels.levels[{}].table_infos: {:?} level_idx {:?}",
349 left_levels.group_id,
350 right_levels.group_id,
351 idx,
352 left_levels.levels[idx].table_infos,
353 left_levels.levels[idx].level_idx
354 )
355 );
356 }
357 }
358
359 pub fn get_sub_level_insert_hint(
363 target_levels: &Vec<Level>,
364 sub_level: &Level,
365 ) -> Result<usize, usize> {
366 for (idx, other) in target_levels.iter().enumerate() {
367 match other.sub_level_id.cmp(&sub_level.sub_level_id) {
368 Ordering::Less => {}
369 Ordering::Equal => {
370 return Ok(idx);
371 }
372 Ordering::Greater => {
373 return Err(idx);
374 }
375 }
376 }
377
378 Err(target_levels.len())
379 }
380
381 pub fn split_sst_info_for_level_v2(
383 level: &mut Level,
384 new_sst_id: &mut HummockSstableId,
385 split_key: Bytes,
386 ) -> Vec<SstableInfo> {
387 if level.table_infos.is_empty() {
388 return vec![];
389 }
390
391 let mut insert_table_infos = vec![];
392 if level.level_type == PbLevelType::Overlapping {
393 level.table_infos.retain_mut(|sst| {
394 let sst_split_type = need_to_split(sst, split_key.clone());
395 match sst_split_type {
396 SstSplitType::Left => true,
397 SstSplitType::Right => {
398 insert_table_infos.push(sst.clone());
399 false
400 }
401 SstSplitType::Both => {
402 let sst_size = sst.sst_size;
403 if sst_size / 2 == 0 {
404 tracing::warn!(
405 id = %sst.sst_id,
406 object_id = %sst.object_id,
407 sst_size = sst.sst_size,
408 file_size = sst.file_size,
409 "Sstable sst_size is under expected",
410 );
411 };
412
413 let (left, right) = split_sst(
414 sst.clone(),
415 new_sst_id,
416 split_key.clone(),
417 sst_size / 2,
418 sst_size / 2,
419 );
420 if let Some(branch_sst) = right {
421 insert_table_infos.push(branch_sst);
422 }
423
424 if let Some(s) = left {
425 *sst = s;
426 true
427 } else {
428 false
429 }
430 }
431 }
432 });
433 } else {
434 let pos = get_split_pos(&level.table_infos, split_key.clone());
435 if pos >= level.table_infos.len() {
436 return insert_table_infos;
437 }
438
439 let sst_split_type = need_to_split(&level.table_infos[pos], split_key.clone());
440 match sst_split_type {
441 SstSplitType::Left => {
442 insert_table_infos.extend_from_slice(&level.table_infos[pos + 1..]);
443 level.table_infos = level.table_infos[0..=pos].to_vec();
444 }
445 SstSplitType::Right => {
446 assert_eq!(0, pos);
447 insert_table_infos.extend_from_slice(&level.table_infos[pos..]); level.table_infos.clear();
449 }
450 SstSplitType::Both => {
451 let sst = level.table_infos[pos].clone();
453 let sst_size = sst.sst_size;
454 if sst_size / 2 == 0 {
455 tracing::warn!(
456 id = %sst.sst_id,
457 object_id = %sst.object_id,
458 sst_size = sst.sst_size,
459 file_size = sst.file_size,
460 "Sstable sst_size is under expected",
461 );
462 };
463
464 let (left, right) = split_sst(
465 sst,
466 new_sst_id,
467 split_key.clone(),
468 sst_size / 2,
469 sst_size / 2,
470 );
471
472 if let Some(branch_sst) = right {
473 insert_table_infos.push(branch_sst);
474 }
475
476 let right_start = pos + 1;
477 let left_end = pos;
478 insert_table_infos.extend_from_slice(&level.table_infos[right_start..]);
481 level.table_infos = level.table_infos[0..=left_end].to_vec();
482 if let Some(origin_sst) = left {
483 level.table_infos[left_end] = origin_sst;
485 } else {
486 level.table_infos.pop();
487 }
488 }
489 };
490 }
491
492 insert_table_infos
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use itertools::Itertools;
499 use risingwave_common::catalog::TableId;
500 use risingwave_common::hash::VirtualNode;
501
502 #[test]
503 fn test_split_table_ids() {
504 let table_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9]
505 .into_iter()
506 .map(Into::<TableId>::into)
507 .collect();
508 let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
509 &table_ids,
510 5.into(),
511 VirtualNode::ZERO.to_index(),
512 );
513 assert_eq!(
514 left,
515 [1, 2, 3, 4]
516 .into_iter()
517 .map(Into::<TableId>::into)
518 .collect_vec()
519 );
520 assert_eq!(
521 right,
522 [5, 6, 7, 8, 9]
523 .into_iter()
524 .map(Into::<TableId>::into)
525 .collect_vec()
526 );
527
528 let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
531 &table_ids,
532 10.into(),
533 VirtualNode::ZERO.to_index(),
534 );
535 assert_eq!(
536 left,
537 [1, 2, 3, 4, 5, 6, 7, 8, 9]
538 .into_iter()
539 .map(Into::<TableId>::into)
540 .collect_vec()
541 );
542 assert!(right.is_empty());
543
544 let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
545 &table_ids,
546 0.into(),
547 VirtualNode::ZERO.to_index(),
548 );
549
550 assert!(left.is_empty());
551 assert_eq!(
552 right,
553 [1, 2, 3, 4, 5, 6, 7, 8, 9]
554 .into_iter()
555 .map(Into::<TableId>::into)
556 .collect_vec()
557 );
558 }
559
560 #[test]
561 fn test_split_table_ids_with_split_key() {
562 let table_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9]
563 .into_iter()
564 .map(Into::<TableId>::into)
565 .collect();
566 let split_key = super::group_split::build_split_key(5.into(), VirtualNode::ZERO);
567 let (left, right) =
568 super::group_split::split_table_ids_with_split_key(&table_ids, split_key);
569 assert_eq!(
570 left,
571 [1, 2, 3, 4]
572 .into_iter()
573 .map(Into::<TableId>::into)
574 .collect_vec()
575 );
576 assert_eq!(
577 right,
578 [5, 6, 7, 8, 9]
579 .into_iter()
580 .map(Into::<TableId>::into)
581 .collect_vec()
582 );
583 }
584}