1pub mod hummock_version_ext;
16
17use parse_display::Display;
18
19use crate::CompactionGroupId;
20
21pub type StateTableId = u32;
22
23#[derive(Display)]
26pub enum StaticCompactionGroupId {
27 NewCompactionGroup = 0,
29 SharedBuffer = 1,
33 StateDefault = 2,
35 MaterializedView = 3,
37 End = 4,
39}
40
41impl From<StaticCompactionGroupId> for CompactionGroupId {
42 fn from(cg: StaticCompactionGroupId) -> Self {
43 cg as CompactionGroupId
44 }
45}
46
47pub mod group_split {
52 use std::cmp::Ordering;
53 use std::collections::BTreeSet;
54
55 use bytes::Bytes;
56 use risingwave_common::catalog::TableId;
57 use risingwave_common::hash::VirtualNode;
58 use risingwave_pb::hummock::PbLevelType;
59
60 use super::StateTableId;
61 use super::hummock_version_ext::insert_new_sub_level;
62 use crate::key::{FullKey, TableKey};
63 use crate::key_range::KeyRange;
64 use crate::level::{Level, Levels};
65 use crate::sstable_info::SstableInfo;
66 use crate::{HummockEpoch, KeyComparator, can_concat};
67
68 pub fn build_split_key(table_id: StateTableId, vnode: VirtualNode) -> Bytes {
70 build_split_full_key(table_id, vnode).encode().into()
71 }
72
73 pub fn build_split_full_key(
75 mut table_id: StateTableId,
76 mut vnode: VirtualNode,
77 ) -> FullKey<Vec<u8>> {
78 if VirtualNode::MAX_REPRESENTABLE == vnode {
79 table_id = table_id.strict_add(1);
81 vnode = VirtualNode::ZERO;
82 }
83
84 FullKey::new(
85 TableId::from(table_id),
86 TableKey(vnode.to_be_bytes().to_vec()),
87 HummockEpoch::MAX,
88 )
89 }
90
91 #[derive(Debug, PartialEq, Clone)]
92 pub enum SstSplitType {
93 Left,
94 Right,
95 Both,
96 }
97
98 pub fn need_to_split(sst: &SstableInfo, split_key: Bytes) -> SstSplitType {
100 let key_range = &sst.key_range;
101 if KeyComparator::compare_encoded_full_key(&split_key, &key_range.left).is_le() {
103 return SstSplitType::Right;
104 }
105
106 if key_range.right_exclusive {
108 if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_ge() {
109 return SstSplitType::Left;
110 }
111 } else if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_gt() {
112 return SstSplitType::Left;
113 }
114
115 SstSplitType::Both
116 }
117
118 pub fn split_sst(
138 origin_sst_info: SstableInfo,
139 new_sst_id: &mut u64,
140 split_key: Bytes,
141 left_size: u64,
142 right_size: u64,
143 ) -> (Option<SstableInfo>, Option<SstableInfo>) {
144 let mut origin_sst_info = origin_sst_info.get_inner();
145 let mut branch_table_info = origin_sst_info.clone();
146 branch_table_info.sst_id = *new_sst_id;
147 *new_sst_id += 1;
148 origin_sst_info.sst_id = *new_sst_id;
149 *new_sst_id += 1;
150
151 let (key_range_l, key_range_r) = {
152 let key_range = &origin_sst_info.key_range;
153 let l = KeyRange {
154 left: key_range.left.clone(),
155 right: split_key.clone(),
156 right_exclusive: true,
157 };
158
159 let r = KeyRange {
160 left: split_key.clone(),
161 right: key_range.right.clone(),
162 right_exclusive: key_range.right_exclusive,
163 };
164
165 (l, r)
166 };
167 let (table_ids_l, table_ids_r) =
168 split_table_ids_with_split_key(&origin_sst_info.table_ids, split_key.clone());
169
170 {
172 origin_sst_info.key_range = key_range_l;
174 origin_sst_info.sst_size = std::cmp::max(1, left_size);
175 origin_sst_info.table_ids = table_ids_l;
176 }
177
178 {
179 branch_table_info.key_range = key_range_r;
181 branch_table_info.sst_size = std::cmp::max(1, right_size);
182 branch_table_info.table_ids = table_ids_r;
183 }
184
185 if origin_sst_info.table_ids.is_empty() {
187 (None, Some(branch_table_info.into()))
188 } else if branch_table_info.table_ids.is_empty() {
189 (Some(origin_sst_info.into()), None)
190 } else if KeyComparator::compare_encoded_full_key(
191 &origin_sst_info.key_range.left,
192 &origin_sst_info.key_range.right,
193 )
194 .is_eq()
195 {
196 (None, Some(branch_table_info.into()))
198 } else {
199 (Some(origin_sst_info.into()), Some(branch_table_info.into()))
200 }
201 }
202
203 pub fn split_sst_with_table_ids(
207 origin_sst_info: &SstableInfo,
208 new_sst_id: &mut u64,
209 old_sst_size: u64,
210 new_sst_size: u64,
211 new_table_ids: Vec<u32>,
212 ) -> (SstableInfo, SstableInfo) {
213 let mut sst_info = origin_sst_info.get_inner();
214 let mut branch_table_info = sst_info.clone();
215 branch_table_info.sst_id = *new_sst_id;
216 branch_table_info.sst_size = std::cmp::max(1, new_sst_size);
217 *new_sst_id += 1;
218
219 sst_info.sst_id = *new_sst_id;
220 sst_info.sst_size = std::cmp::max(1, old_sst_size);
221 *new_sst_id += 1;
222
223 {
224 let set1: BTreeSet<_> = sst_info.table_ids.iter().cloned().collect();
230 let set2: BTreeSet<_> = new_table_ids.into_iter().collect();
231 let intersection: Vec<_> = set1.intersection(&set2).cloned().collect();
232
233 branch_table_info.table_ids = intersection;
235 sst_info
236 .table_ids
237 .retain(|table_id| !branch_table_info.table_ids.contains(table_id));
238 }
239
240 (sst_info.into(), branch_table_info.into())
241 }
242
243 pub fn split_table_ids_with_split_key(
245 table_ids: &Vec<u32>,
246 split_key: Bytes,
247 ) -> (Vec<u32>, Vec<u32>) {
248 assert!(table_ids.is_sorted());
249 let split_full_key = FullKey::decode(&split_key);
250 let split_user_key = split_full_key.user_key;
251 let vnode = split_user_key.get_vnode_id();
252 let table_id = split_user_key.table_id.table_id();
253 split_table_ids_with_table_id_and_vnode(table_ids, table_id, vnode)
254 }
255
256 pub fn split_table_ids_with_table_id_and_vnode(
257 table_ids: &Vec<u32>,
258 table_id: StateTableId,
259 vnode: usize,
260 ) -> (Vec<u32>, Vec<u32>) {
261 assert!(table_ids.is_sorted());
262 assert_eq!(VirtualNode::ZERO, VirtualNode::from_index(vnode));
263 let pos = table_ids.partition_point(|&id| id < table_id);
264 (table_ids[..pos].to_vec(), table_ids[pos..].to_vec())
265 }
266
267 pub fn get_split_pos(sstables: &Vec<SstableInfo>, split_key: Bytes) -> usize {
268 sstables
269 .partition_point(|sst| {
270 KeyComparator::compare_encoded_full_key(&sst.key_range.left, &split_key).is_lt()
271 })
272 .saturating_sub(1)
273 }
274
275 pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) {
277 let right_l0 = right_levels.l0;
278
279 let mut max_left_sub_level_id = left_levels
280 .l0
281 .sub_levels
282 .iter()
283 .map(|sub_level| sub_level.sub_level_id + 1)
284 .max()
285 .unwrap_or(0); let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0;
287
288 for mut right_sub_level in right_l0.sub_levels {
289 if need_rewrite_right_sub_level_id {
292 right_sub_level.sub_level_id = max_left_sub_level_id;
293 max_left_sub_level_id += 1;
294 }
295
296 insert_new_sub_level(
297 &mut left_levels.l0,
298 right_sub_level.sub_level_id,
299 right_sub_level.level_type,
300 right_sub_level.table_infos,
301 None,
302 );
303 }
304
305 assert!(
306 left_levels
307 .l0
308 .sub_levels
309 .is_sorted_by_key(|sub_level| sub_level.sub_level_id),
310 "{}",
311 format!("left_levels.l0.sub_levels: {:?}", left_levels.l0.sub_levels)
312 );
313
314 left_levels
317 .l0
318 .sub_levels
319 .iter_mut()
320 .for_each(|sub_level| sub_level.vnode_partition_count = 0);
321
322 for (idx, level) in right_levels.levels.into_iter().enumerate() {
323 if level.table_infos.is_empty() {
324 continue;
325 }
326
327 let insert_table_infos = level.table_infos;
328 left_levels.levels[idx].total_file_size += insert_table_infos
329 .iter()
330 .map(|sst| sst.sst_size)
331 .sum::<u64>();
332 left_levels.levels[idx].uncompressed_file_size += insert_table_infos
333 .iter()
334 .map(|sst| sst.uncompressed_file_size)
335 .sum::<u64>();
336
337 left_levels.levels[idx]
338 .table_infos
339 .extend(insert_table_infos);
340 left_levels.levels[idx]
341 .table_infos
342 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
343 assert!(
344 can_concat(&left_levels.levels[idx].table_infos),
345 "{}",
346 format!(
347 "left-group {} right-group {} left_levels.levels[{}].table_infos: {:?} level_idx {:?}",
348 left_levels.group_id,
349 right_levels.group_id,
350 idx,
351 left_levels.levels[idx].table_infos,
352 left_levels.levels[idx].level_idx
353 )
354 );
355 }
356 }
357
358 pub fn get_sub_level_insert_hint(
362 target_levels: &Vec<Level>,
363 sub_level: &Level,
364 ) -> Result<usize, usize> {
365 for (idx, other) in target_levels.iter().enumerate() {
366 match other.sub_level_id.cmp(&sub_level.sub_level_id) {
367 Ordering::Less => {}
368 Ordering::Equal => {
369 return Ok(idx);
370 }
371 Ordering::Greater => {
372 return Err(idx);
373 }
374 }
375 }
376
377 Err(target_levels.len())
378 }
379
380 pub fn split_sst_info_for_level_v2(
382 level: &mut Level,
383 new_sst_id: &mut u64,
384 split_key: Bytes,
385 ) -> Vec<SstableInfo> {
386 if level.table_infos.is_empty() {
387 return vec![];
388 }
389
390 let mut insert_table_infos = vec![];
391 if level.level_type == PbLevelType::Overlapping {
392 level.table_infos.retain_mut(|sst| {
393 let sst_split_type = need_to_split(sst, split_key.clone());
394 match sst_split_type {
395 SstSplitType::Left => true,
396 SstSplitType::Right => {
397 insert_table_infos.push(sst.clone());
398 false
399 }
400 SstSplitType::Both => {
401 let sst_size = sst.sst_size;
402 if sst_size / 2 == 0 {
403 tracing::warn!(
404 id = sst.sst_id,
405 object_id = sst.object_id,
406 sst_size = sst.sst_size,
407 file_size = sst.file_size,
408 "Sstable sst_size is under expected",
409 );
410 };
411
412 let (left, right) = split_sst(
413 sst.clone(),
414 new_sst_id,
415 split_key.clone(),
416 sst_size / 2,
417 sst_size / 2,
418 );
419 if let Some(branch_sst) = right {
420 insert_table_infos.push(branch_sst);
421 }
422
423 if left.is_some() {
424 *sst = left.unwrap();
425 true
426 } else {
427 false
428 }
429 }
430 }
431 });
432 } else {
433 let pos = get_split_pos(&level.table_infos, split_key.clone());
434 if pos >= level.table_infos.len() {
435 return insert_table_infos;
436 }
437
438 let sst_split_type = need_to_split(&level.table_infos[pos], split_key.clone());
439 match sst_split_type {
440 SstSplitType::Left => {
441 insert_table_infos.extend_from_slice(&level.table_infos[pos + 1..]);
442 level.table_infos = level.table_infos[0..=pos].to_vec();
443 }
444 SstSplitType::Right => {
445 assert_eq!(0, pos);
446 insert_table_infos.extend_from_slice(&level.table_infos[pos..]); level.table_infos.clear();
448 }
449 SstSplitType::Both => {
450 let sst = level.table_infos[pos].clone();
452 let sst_size = sst.sst_size;
453 if sst_size / 2 == 0 {
454 tracing::warn!(
455 id = sst.sst_id,
456 object_id = sst.object_id,
457 sst_size = sst.sst_size,
458 file_size = sst.file_size,
459 "Sstable sst_size is under expected",
460 );
461 };
462
463 let (left, right) = split_sst(
464 sst,
465 new_sst_id,
466 split_key.clone(),
467 sst_size / 2,
468 sst_size / 2,
469 );
470
471 if let Some(branch_sst) = right {
472 insert_table_infos.push(branch_sst);
473 }
474
475 let right_start = pos + 1;
476 let left_end = pos;
477 insert_table_infos.extend_from_slice(&level.table_infos[right_start..]);
480 level.table_infos = level.table_infos[0..=left_end].to_vec();
481 if let Some(origin_sst) = left {
482 level.table_infos[left_end] = origin_sst;
484 } else {
485 level.table_infos.pop();
486 }
487 }
488 };
489 }
490
491 insert_table_infos
492 }
493}
494
495#[cfg(test)]
496mod tests {
497 use risingwave_common::hash::VirtualNode;
498
499 #[test]
500 fn test_split_table_ids() {
501 let table_ids = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
502 let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
503 &table_ids,
504 5,
505 VirtualNode::ZERO.to_index(),
506 );
507 assert_eq!(left, vec![1, 2, 3, 4]);
508 assert_eq!(right, vec![5, 6, 7, 8, 9]);
509
510 let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
513 &table_ids,
514 10,
515 VirtualNode::ZERO.to_index(),
516 );
517 assert_eq!(left, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
518 assert!(right.is_empty());
519
520 let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
521 &table_ids,
522 0,
523 VirtualNode::ZERO.to_index(),
524 );
525
526 assert!(left.is_empty());
527 assert_eq!(right, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
528 }
529
530 #[test]
531 fn test_split_table_ids_with_split_key() {
532 let table_ids = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
533 let split_key = super::group_split::build_split_key(5, VirtualNode::ZERO);
534 let (left, right) =
535 super::group_split::split_table_ids_with_split_key(&table_ids, split_key);
536 assert_eq!(left, vec![1, 2, 3, 4]);
537 assert_eq!(right, vec![5, 6, 7, 8, 9]);
538 }
539}