1use std::collections::{HashMap, VecDeque};
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
19use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo, PbTableChangeLog};
20use tracing::warn;
21
22use crate::HummockObjectId;
23use crate::sstable_info::SstableInfo;
24use crate::version::ObjectIdReader;
25
26#[derive(Debug, Clone, PartialEq)]
27pub struct TableChangeLogCommon<T>(
28 VecDeque<EpochNewChangeLogCommon<T>>,
30);
31
32impl<T> TableChangeLogCommon<T> {
33 pub fn new(logs: impl IntoIterator<Item = EpochNewChangeLogCommon<T>>) -> Self {
34 let logs = logs.into_iter().collect::<VecDeque<_>>();
35 debug_assert!(logs.iter().flat_map(|log| log.epochs()).is_sorted());
36 Self(logs)
37 }
38
39 pub fn iter(&self) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> {
40 self.0.iter()
41 }
42
43 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
44 self.0.iter_mut()
45 }
46
47 pub fn add_change_log(&mut self, new_change_log: EpochNewChangeLogCommon<T>) {
48 if let Some(prev_log) = self.0.back() {
49 assert!(prev_log.checkpoint_epoch < new_change_log.first_epoch());
50 }
51 self.0.push_back(new_change_log);
52 }
53
54 pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
55 self.0
56 .iter()
57 .flat_map(|epoch_change_log| epoch_change_log.epochs())
58 }
59
60 pub fn is_empty(&self) -> bool {
61 self.0.is_empty()
62 }
63
64 pub fn binary_search_by_epoch(&self, epoch: u64) -> Result<usize, usize> {
65 self.0
66 .binary_search_by_key(&epoch, |log| log.checkpoint_epoch)
67 }
68}
69
70impl<T> IntoIterator for TableChangeLogCommon<T> {
71 type Item = EpochNewChangeLogCommon<T>;
72
73 type IntoIter = impl Iterator<Item = EpochNewChangeLogCommon<T>>;
74
75 fn into_iter(self) -> Self::IntoIter {
76 self.0.into_iter()
77 }
78}
79
80pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;
81pub type TableChangeLogs = HashMap<TableId, TableChangeLog>;
82
83impl TableChangeLog {
84 pub fn get_object_ids(&self) -> impl Iterator<Item = HummockObjectId> + '_ {
85 self.0.iter().flat_map(|c| {
86 c.old_value
87 .iter()
88 .chain(c.new_value.iter())
89 .map(|t| HummockObjectId::Sstable(t.object_id()))
90 })
91 }
92}
93
94#[derive(Debug, Clone, PartialEq)]
95pub struct EpochNewChangeLogCommon<T> {
96 pub new_value: Vec<T>,
97 pub old_value: Vec<T>,
98 pub non_checkpoint_epochs: Vec<u64>,
100 pub checkpoint_epoch: u64,
101}
102
103impl EpochNewChangeLog {
104 pub fn change_log_ssts(&self) -> impl Iterator<Item = &SstableInfo> + '_ {
105 self.new_value.iter().chain(self.old_value.iter())
106 }
107}
108
109pub(crate) fn resolve_pb_log_epochs(epochs: &Vec<u64>) -> (Vec<u64>, u64) {
110 (
111 Vec::from(&epochs[0..(epochs.len() - 1)]),
112 *epochs.last().expect("non-empty"),
113 )
114}
115
116impl<T> EpochNewChangeLogCommon<T> {
117 pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
118 self.non_checkpoint_epochs
119 .iter()
120 .cloned()
121 .chain([self.checkpoint_epoch])
122 }
123
124 pub fn first_epoch(&self) -> u64 {
125 self.non_checkpoint_epochs
126 .first()
127 .cloned()
128 .unwrap_or(self.checkpoint_epoch)
129 }
130}
131
132pub type EpochNewChangeLog = EpochNewChangeLogCommon<SstableInfo>;
133
134impl<T> From<&EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
135where
136 PbSstableInfo: for<'a> From<&'a T>,
137{
138 fn from(val: &EpochNewChangeLogCommon<T>) -> Self {
139 Self {
140 new_value: val.new_value.iter().map(|a| a.into()).collect(),
141 old_value: val.old_value.iter().map(|a| a.into()).collect(),
142 epochs: val.epochs().collect(),
143 }
144 }
145}
146
147impl<T> From<&PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
148where
149 T: for<'a> From<&'a PbSstableInfo>,
150{
151 fn from(value: &PbEpochNewChangeLog) -> Self {
152 let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
153 Self {
154 new_value: value.new_value.iter().map(|a| a.into()).collect(),
155 old_value: value.old_value.iter().map(|a| a.into()).collect(),
156 non_checkpoint_epochs,
157 checkpoint_epoch,
158 }
159 }
160}
161
162impl<T> From<EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
163where
164 PbSstableInfo: From<T>,
165{
166 fn from(val: EpochNewChangeLogCommon<T>) -> Self {
167 Self {
168 epochs: val.epochs().collect(),
169 new_value: val.new_value.into_iter().map(|a| a.into()).collect(),
170 old_value: val.old_value.into_iter().map(|a| a.into()).collect(),
171 }
172 }
173}
174
175impl<T> From<PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
176where
177 T: From<PbSstableInfo>,
178{
179 fn from(value: PbEpochNewChangeLog) -> Self {
180 let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
181 Self {
182 new_value: value.new_value.into_iter().map(|a| a.into()).collect(),
183 old_value: value.old_value.into_iter().map(|a| a.into()).collect(),
184 non_checkpoint_epochs,
185 checkpoint_epoch,
186 }
187 }
188}
189
190impl<T> TableChangeLogCommon<T> {
191 pub fn filter_epoch(
192 &self,
193 (min_epoch, max_epoch): (u64, u64),
194 ) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> + '_ {
195 let start = self
196 .0
197 .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < min_epoch);
198 let end = self
199 .0
200 .partition_point(|epoch_change_log| epoch_change_log.first_epoch() <= max_epoch);
201 self.0.range(start..end)
202 }
203
204 #[expect(clippy::result_unit_err)]
210 pub fn next_epoch(&self, epoch: u64) -> Result<Option<u64>, ()> {
211 let start = self
212 .0
213 .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < epoch);
214 debug_assert!(
215 self.0
216 .range(start..)
217 .flat_map(|epoch_change_log| epoch_change_log.epochs())
218 .is_sorted()
219 );
220 let mut later_epochs = self
221 .0
222 .range(start..)
223 .flat_map(|epoch_change_log| epoch_change_log.epochs())
224 .skip_while(|log_epoch| *log_epoch < epoch);
225 if let Some(first_epoch) = later_epochs.next() {
226 assert!(
227 first_epoch >= epoch,
228 "first_epoch {} < epoch {}",
229 first_epoch,
230 epoch
231 );
232 if first_epoch != epoch {
233 return Err(());
234 }
235 if let Some(next_epoch) = later_epochs.next() {
236 assert!(
237 next_epoch > epoch,
238 "next_epoch {} not exceed epoch {}",
239 next_epoch,
240 epoch
241 );
242 Ok(Some(next_epoch))
243 } else {
244 Ok(None)
246 }
247 } else {
248 Ok(None)
250 }
251 }
252
253 pub fn truncate(&mut self, truncate_epoch: u64) {
254 while let Some(change_log) = self.0.front()
255 && change_log.checkpoint_epoch < truncate_epoch
256 {
257 let _change_log = self.0.pop_front().expect("non-empty");
258 }
259 if let Some(first_log) = self.0.front_mut() {
260 first_log
261 .non_checkpoint_epochs
262 .retain(|epoch| *epoch >= truncate_epoch);
263 }
264 }
265}
266
267impl<T> TableChangeLogCommon<T>
268where
269 PbSstableInfo: for<'a> From<&'a T>,
270{
271 pub fn to_protobuf(&self) -> PbTableChangeLog {
272 PbTableChangeLog {
273 change_logs: self.0.iter().map(|a| a.into()).collect(),
274 }
275 }
276}
277
278impl<T> TableChangeLogCommon<T>
279where
280 T: for<'a> From<&'a PbSstableInfo>,
281{
282 pub fn from_protobuf(val: &PbTableChangeLog) -> Self {
283 Self(val.change_logs.iter().map(|a| a.into()).collect())
284 }
285}
286
287impl<T> TableChangeLogCommon<T>
288where
289 T: From<PbSstableInfo>,
290{
291 pub fn from_protobuf_owned(val: PbTableChangeLog) -> Self {
292 Self(val.change_logs.into_iter().map(|a| a.into()).collect())
293 }
294}
295
296pub fn build_table_change_log_delta<'a>(
297 old_value_ssts: impl Iterator<Item = SstableInfo>,
298 new_value_ssts: impl Iterator<Item = &'a SstableInfo>,
299 epochs: &Vec<u64>,
300 log_store_table_ids: impl Iterator<Item = (TableId, u64)>,
301) -> HashMap<TableId, ChangeLogDelta> {
302 let mut table_change_log: HashMap<_, _> = log_store_table_ids
303 .map(|(table_id, truncate_epoch)| {
304 let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(epochs);
305 (
306 table_id,
307 ChangeLogDelta {
308 truncate_epoch,
309 new_log: EpochNewChangeLog {
310 new_value: vec![],
311 old_value: vec![],
312 non_checkpoint_epochs,
313 checkpoint_epoch,
314 },
315 },
316 )
317 })
318 .collect();
319 for sst in old_value_ssts {
320 for table_id in &sst.table_ids {
321 match table_change_log.get_mut(table_id) {
322 Some(log) => {
323 log.new_log.old_value.push(sst.clone());
324 }
325 None => {
326 warn!(%table_id, ?sst, "old value sst contains non-log-store table");
327 }
328 }
329 }
330 }
331 for sst in new_value_ssts {
332 for table_id in &sst.table_ids {
333 if let Some(log) = table_change_log.get_mut(table_id) {
334 log.new_log.new_value.push(sst.clone());
335 }
336 }
337 }
338 table_change_log
339}
340
341#[derive(Debug, PartialEq, Clone)]
342pub struct ChangeLogDeltaCommon<T> {
343 pub truncate_epoch: u64,
344 pub new_log: EpochNewChangeLogCommon<T>,
345}
346
347pub type ChangeLogDelta = ChangeLogDeltaCommon<SstableInfo>;
348
349impl<T> From<&ChangeLogDeltaCommon<T>> for PbChangeLogDelta
350where
351 PbSstableInfo: for<'a> From<&'a T>,
352{
353 fn from(val: &ChangeLogDeltaCommon<T>) -> Self {
354 Self {
355 truncate_epoch: val.truncate_epoch,
356 new_log: Some((&val.new_log).into()),
357 }
358 }
359}
360
361impl<T> From<&PbChangeLogDelta> for ChangeLogDeltaCommon<T>
362where
363 T: for<'a> From<&'a PbSstableInfo>,
364{
365 fn from(val: &PbChangeLogDelta) -> Self {
366 Self {
367 truncate_epoch: val.truncate_epoch,
368 new_log: val.new_log.as_ref().unwrap().into(),
369 }
370 }
371}
372
373impl<T> From<ChangeLogDeltaCommon<T>> for PbChangeLogDelta
374where
375 PbSstableInfo: From<T>,
376{
377 fn from(val: ChangeLogDeltaCommon<T>) -> Self {
378 Self {
379 truncate_epoch: val.truncate_epoch,
380 new_log: Some(val.new_log.into()),
381 }
382 }
383}
384
385impl<T> From<PbChangeLogDelta> for ChangeLogDeltaCommon<T>
386where
387 T: From<PbSstableInfo>,
388{
389 fn from(val: PbChangeLogDelta) -> Self {
390 Self {
391 truncate_epoch: val.truncate_epoch,
392 new_log: val.new_log.unwrap().into(),
393 }
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use itertools::Itertools;
400
401 use crate::change_log::{EpochNewChangeLog, TableChangeLogCommon};
402 use crate::sstable_info::SstableInfo;
403
404 #[test]
405 fn test_filter_epoch() {
406 let table_change_log = TableChangeLogCommon::<SstableInfo>::new([
407 EpochNewChangeLog {
408 new_value: vec![],
409 old_value: vec![],
410 non_checkpoint_epochs: vec![],
411 checkpoint_epoch: 2,
412 },
413 EpochNewChangeLog {
414 new_value: vec![],
415 old_value: vec![],
416 non_checkpoint_epochs: vec![3],
417 checkpoint_epoch: 4,
418 },
419 EpochNewChangeLog {
420 new_value: vec![],
421 old_value: vec![],
422 non_checkpoint_epochs: vec![],
423 checkpoint_epoch: 6,
424 },
425 EpochNewChangeLog {
426 new_value: vec![],
427 old_value: vec![],
428 non_checkpoint_epochs: vec![8],
429 checkpoint_epoch: 10,
430 },
431 ]);
432
433 let epochs = (1..=11).collect_vec();
434 for i in 0..epochs.len() {
435 for j in i..epochs.len() {
436 let min_epoch = epochs[i];
437 let max_epoch = epochs[j];
438 let expected = table_change_log
439 .0
440 .iter()
441 .filter(|log| {
442 min_epoch <= log.checkpoint_epoch && log.first_epoch() <= max_epoch
443 })
444 .cloned()
445 .collect_vec();
446 let actual = table_change_log
447 .filter_epoch((min_epoch, max_epoch))
448 .cloned()
449 .collect_vec();
450 assert_eq!(expected, actual, "{:?}", (min_epoch, max_epoch));
451 }
452 }
453
454 let existing_epochs = table_change_log.epochs().collect_vec();
455 assert!(existing_epochs.is_sorted());
456 for &epoch in &epochs {
457 let expected = match existing_epochs
458 .iter()
459 .position(|existing_epoch| *existing_epoch >= epoch)
460 {
461 None => {
462 Ok(None)
464 }
465 Some(i) => {
466 let this_epoch = existing_epochs[i];
467 assert!(this_epoch >= epoch);
468 if this_epoch == epoch {
469 if i + 1 == existing_epochs.len() {
470 Ok(None)
472 } else {
473 Ok(Some(existing_epochs[i + 1]))
474 }
475 } else {
476 Err(())
478 }
479 }
480 };
481 assert_eq!(expected, table_change_log.next_epoch(epoch));
482 }
483 }
484
485 #[test]
486 fn test_truncate() {
487 let mut table_change_log = TableChangeLogCommon::<SstableInfo>::new([
488 EpochNewChangeLog {
489 new_value: vec![],
490 old_value: vec![],
491 non_checkpoint_epochs: vec![],
492 checkpoint_epoch: 1,
493 },
494 EpochNewChangeLog {
495 new_value: vec![],
496 old_value: vec![],
497 non_checkpoint_epochs: vec![],
498 checkpoint_epoch: 2,
499 },
500 EpochNewChangeLog {
501 new_value: vec![],
502 old_value: vec![],
503 non_checkpoint_epochs: vec![3],
504 checkpoint_epoch: 4,
505 },
506 EpochNewChangeLog {
507 new_value: vec![],
508 old_value: vec![],
509 non_checkpoint_epochs: vec![],
510 checkpoint_epoch: 5,
511 },
512 ]);
513 let origin_table_change_log = table_change_log.clone();
514 for truncate_epoch in 0..6 {
515 table_change_log.truncate(truncate_epoch);
516 let expected_table_change_log = TableChangeLogCommon(
517 origin_table_change_log
518 .0
519 .iter()
520 .filter_map(|epoch_change_log| {
521 let mut epoch_change_log = epoch_change_log.clone();
522 epoch_change_log
523 .non_checkpoint_epochs
524 .retain(|epoch| *epoch >= truncate_epoch);
525 if epoch_change_log.non_checkpoint_epochs.is_empty()
526 && epoch_change_log.checkpoint_epoch < truncate_epoch
527 {
528 None
529 } else {
530 Some(epoch_change_log)
531 }
532 })
533 .collect(),
534 );
535 assert_eq!(expected_table_change_log, table_change_log);
536 }
537 }
538}