1use std::collections::{HashMap, VecDeque};
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
19use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo, PbTableChangeLog};
20use tracing::warn;
21
22use crate::sstable_info::SstableInfo;
23
24#[derive(Debug, Clone, PartialEq)]
25pub struct TableChangeLogCommon<T>(
26 VecDeque<EpochNewChangeLogCommon<T>>,
28);
29
30impl<T> TableChangeLogCommon<T> {
31 pub fn new(logs: impl IntoIterator<Item = EpochNewChangeLogCommon<T>>) -> Self {
32 let logs = logs.into_iter().collect::<VecDeque<_>>();
33 debug_assert!(logs.iter().flat_map(|log| log.epochs()).is_sorted());
34 Self(logs)
35 }
36
37 pub fn iter(&self) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> {
38 self.0.iter()
39 }
40
41 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
42 self.0.iter_mut()
43 }
44
45 pub fn add_change_log(&mut self, new_change_log: EpochNewChangeLogCommon<T>) {
46 if let Some(prev_log) = self.0.back() {
47 assert!(prev_log.checkpoint_epoch < new_change_log.first_epoch());
48 }
49 self.0.push_back(new_change_log);
50 }
51
52 pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
53 self.0
54 .iter()
55 .flat_map(|epoch_change_log| epoch_change_log.epochs())
56 }
57
58 pub(crate) fn change_log_into_iter(self) -> impl Iterator<Item = EpochNewChangeLogCommon<T>> {
59 self.0.into_iter()
60 }
61
62 pub(crate) fn change_log_iter_mut(
63 &mut self,
64 ) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
65 self.0.iter_mut()
66 }
67}
68
69pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;
70
71#[derive(Debug, Clone, PartialEq)]
72pub struct EpochNewChangeLogCommon<T> {
73 pub new_value: Vec<T>,
74 pub old_value: Vec<T>,
75 pub non_checkpoint_epochs: Vec<u64>,
77 pub checkpoint_epoch: u64,
78}
79
80pub(crate) fn resolve_pb_log_epochs(epochs: &Vec<u64>) -> (Vec<u64>, u64) {
81 (
82 Vec::from(&epochs[0..(epochs.len() - 1)]),
83 *epochs.last().expect("non-empty"),
84 )
85}
86
87impl<T> EpochNewChangeLogCommon<T> {
88 pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
89 self.non_checkpoint_epochs
90 .iter()
91 .cloned()
92 .chain([self.checkpoint_epoch])
93 }
94
95 pub fn first_epoch(&self) -> u64 {
96 self.non_checkpoint_epochs
97 .first()
98 .cloned()
99 .unwrap_or(self.checkpoint_epoch)
100 }
101}
102
103pub type EpochNewChangeLog = EpochNewChangeLogCommon<SstableInfo>;
104
105impl<T> From<&EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
106where
107 PbSstableInfo: for<'a> From<&'a T>,
108{
109 fn from(val: &EpochNewChangeLogCommon<T>) -> Self {
110 Self {
111 new_value: val.new_value.iter().map(|a| a.into()).collect(),
112 old_value: val.old_value.iter().map(|a| a.into()).collect(),
113 epochs: val.epochs().collect(),
114 }
115 }
116}
117
118impl<T> From<&PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
119where
120 T: for<'a> From<&'a PbSstableInfo>,
121{
122 fn from(value: &PbEpochNewChangeLog) -> Self {
123 let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
124 Self {
125 new_value: value.new_value.iter().map(|a| a.into()).collect(),
126 old_value: value.old_value.iter().map(|a| a.into()).collect(),
127 non_checkpoint_epochs,
128 checkpoint_epoch,
129 }
130 }
131}
132
133impl<T> From<EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
134where
135 PbSstableInfo: From<T>,
136{
137 fn from(val: EpochNewChangeLogCommon<T>) -> Self {
138 Self {
139 epochs: val.epochs().collect(),
140 new_value: val.new_value.into_iter().map(|a| a.into()).collect(),
141 old_value: val.old_value.into_iter().map(|a| a.into()).collect(),
142 }
143 }
144}
145
146impl<T> From<PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
147where
148 T: From<PbSstableInfo>,
149{
150 fn from(value: PbEpochNewChangeLog) -> Self {
151 let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
152 Self {
153 new_value: value.new_value.into_iter().map(|a| a.into()).collect(),
154 old_value: value.old_value.into_iter().map(|a| a.into()).collect(),
155 non_checkpoint_epochs,
156 checkpoint_epoch,
157 }
158 }
159}
160
161impl<T> TableChangeLogCommon<T> {
162 pub fn filter_epoch(
163 &self,
164 (min_epoch, max_epoch): (u64, u64),
165 ) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> + '_ {
166 let start = self
167 .0
168 .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < min_epoch);
169 let end = self
170 .0
171 .partition_point(|epoch_change_log| epoch_change_log.first_epoch() <= max_epoch);
172 self.0.range(start..end)
173 }
174
175 #[expect(clippy::result_unit_err)]
181 pub fn next_epoch(&self, epoch: u64) -> Result<Option<u64>, ()> {
182 let start = self
183 .0
184 .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < epoch);
185 debug_assert!(
186 self.0
187 .range(start..)
188 .flat_map(|epoch_change_log| epoch_change_log.epochs())
189 .is_sorted()
190 );
191 let mut later_epochs = self
192 .0
193 .range(start..)
194 .flat_map(|epoch_change_log| epoch_change_log.epochs())
195 .skip_while(|log_epoch| *log_epoch < epoch);
196 if let Some(first_epoch) = later_epochs.next() {
197 assert!(
198 first_epoch >= epoch,
199 "first_epoch {} < epoch {}",
200 first_epoch,
201 epoch
202 );
203 if first_epoch != epoch {
204 return Err(());
205 }
206 if let Some(next_epoch) = later_epochs.next() {
207 assert!(
208 next_epoch > epoch,
209 "next_epoch {} not exceed epoch {}",
210 next_epoch,
211 epoch
212 );
213 Ok(Some(next_epoch))
214 } else {
215 Ok(None)
217 }
218 } else {
219 Ok(None)
221 }
222 }
223
224 pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec<u64> {
226 self.filter_epoch((min_epoch, u64::MAX))
227 .filter(|epoch_change_log| {
228 let new_value_empty = epoch_change_log.new_value.is_empty();
230 let old_value_empty = epoch_change_log.old_value.is_empty();
231 !new_value_empty || !old_value_empty
232 })
233 .flat_map(|epoch_change_log| epoch_change_log.epochs())
234 .filter(|a| a >= &min_epoch)
235 .take(max_count)
236 .collect()
237 }
238
239 pub fn truncate(&mut self, truncate_epoch: u64) {
240 while let Some(change_log) = self.0.front()
241 && change_log.checkpoint_epoch < truncate_epoch
242 {
243 let _change_log = self.0.pop_front().expect("non-empty");
244 }
245 if let Some(first_log) = self.0.front_mut() {
246 first_log
247 .non_checkpoint_epochs
248 .retain(|epoch| *epoch >= truncate_epoch);
249 }
250 }
251}
252
253impl<T> TableChangeLogCommon<T>
254where
255 PbSstableInfo: for<'a> From<&'a T>,
256{
257 pub fn to_protobuf(&self) -> PbTableChangeLog {
258 PbTableChangeLog {
259 change_logs: self.0.iter().map(|a| a.into()).collect(),
260 }
261 }
262}
263
264impl<T> TableChangeLogCommon<T>
265where
266 T: for<'a> From<&'a PbSstableInfo>,
267{
268 pub fn from_protobuf(val: &PbTableChangeLog) -> Self {
269 Self(val.change_logs.iter().map(|a| a.into()).collect())
270 }
271}
272
273pub fn build_table_change_log_delta<'a>(
274 old_value_ssts: impl Iterator<Item = SstableInfo>,
275 new_value_ssts: impl Iterator<Item = &'a SstableInfo>,
276 epochs: &Vec<u64>,
277 log_store_table_ids: impl Iterator<Item = (u32, u64)>,
278) -> HashMap<TableId, ChangeLogDelta> {
279 let mut table_change_log: HashMap<_, _> = log_store_table_ids
280 .map(|(table_id, truncate_epoch)| {
281 let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(epochs);
282 (
283 TableId::new(table_id),
284 ChangeLogDelta {
285 truncate_epoch,
286 new_log: EpochNewChangeLog {
287 new_value: vec![],
288 old_value: vec![],
289 non_checkpoint_epochs,
290 checkpoint_epoch,
291 },
292 },
293 )
294 })
295 .collect();
296 for sst in old_value_ssts {
297 for table_id in &sst.table_ids {
298 match table_change_log.get_mut(&TableId::new(*table_id)) {
299 Some(log) => {
300 log.new_log.old_value.push(sst.clone());
301 }
302 None => {
303 warn!(table_id, ?sst, "old value sst contains non-log-store table");
304 }
305 }
306 }
307 }
308 for sst in new_value_ssts {
309 for table_id in &sst.table_ids {
310 if let Some(log) = table_change_log.get_mut(&TableId::new(*table_id)) {
311 log.new_log.new_value.push(sst.clone());
312 }
313 }
314 }
315 table_change_log
316}
317
318#[derive(Debug, PartialEq, Clone)]
319pub struct ChangeLogDeltaCommon<T> {
320 pub truncate_epoch: u64,
321 pub new_log: EpochNewChangeLogCommon<T>,
322}
323
324pub type ChangeLogDelta = ChangeLogDeltaCommon<SstableInfo>;
325
326impl<T> From<&ChangeLogDeltaCommon<T>> for PbChangeLogDelta
327where
328 PbSstableInfo: for<'a> From<&'a T>,
329{
330 fn from(val: &ChangeLogDeltaCommon<T>) -> Self {
331 Self {
332 truncate_epoch: val.truncate_epoch,
333 new_log: Some((&val.new_log).into()),
334 }
335 }
336}
337
338impl<T> From<&PbChangeLogDelta> for ChangeLogDeltaCommon<T>
339where
340 T: for<'a> From<&'a PbSstableInfo>,
341{
342 fn from(val: &PbChangeLogDelta) -> Self {
343 Self {
344 truncate_epoch: val.truncate_epoch,
345 new_log: val.new_log.as_ref().unwrap().into(),
346 }
347 }
348}
349
350impl<T> From<ChangeLogDeltaCommon<T>> for PbChangeLogDelta
351where
352 PbSstableInfo: From<T>,
353{
354 fn from(val: ChangeLogDeltaCommon<T>) -> Self {
355 Self {
356 truncate_epoch: val.truncate_epoch,
357 new_log: Some(val.new_log.into()),
358 }
359 }
360}
361
362impl<T> From<PbChangeLogDelta> for ChangeLogDeltaCommon<T>
363where
364 T: From<PbSstableInfo>,
365{
366 fn from(val: PbChangeLogDelta) -> Self {
367 Self {
368 truncate_epoch: val.truncate_epoch,
369 new_log: val.new_log.unwrap().into(),
370 }
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use itertools::Itertools;
377
378 use crate::change_log::{EpochNewChangeLog, TableChangeLogCommon};
379 use crate::sstable_info::SstableInfo;
380
381 #[test]
382 fn test_filter_epoch() {
383 let table_change_log = TableChangeLogCommon::<SstableInfo>::new([
384 EpochNewChangeLog {
385 new_value: vec![],
386 old_value: vec![],
387 non_checkpoint_epochs: vec![],
388 checkpoint_epoch: 2,
389 },
390 EpochNewChangeLog {
391 new_value: vec![],
392 old_value: vec![],
393 non_checkpoint_epochs: vec![3],
394 checkpoint_epoch: 4,
395 },
396 EpochNewChangeLog {
397 new_value: vec![],
398 old_value: vec![],
399 non_checkpoint_epochs: vec![],
400 checkpoint_epoch: 6,
401 },
402 EpochNewChangeLog {
403 new_value: vec![],
404 old_value: vec![],
405 non_checkpoint_epochs: vec![8],
406 checkpoint_epoch: 10,
407 },
408 ]);
409
410 let epochs = (1..=11).collect_vec();
411 for i in 0..epochs.len() {
412 for j in i..epochs.len() {
413 let min_epoch = epochs[i];
414 let max_epoch = epochs[j];
415 let expected = table_change_log
416 .0
417 .iter()
418 .filter(|log| {
419 min_epoch <= log.checkpoint_epoch && log.first_epoch() <= max_epoch
420 })
421 .cloned()
422 .collect_vec();
423 let actual = table_change_log
424 .filter_epoch((min_epoch, max_epoch))
425 .cloned()
426 .collect_vec();
427 assert_eq!(expected, actual, "{:?}", (min_epoch, max_epoch));
428 }
429 }
430
431 let existing_epochs = table_change_log.epochs().collect_vec();
432 assert!(existing_epochs.is_sorted());
433 for &epoch in &epochs {
434 let expected = match existing_epochs
435 .iter()
436 .position(|existing_epoch| *existing_epoch >= epoch)
437 {
438 None => {
439 Ok(None)
441 }
442 Some(i) => {
443 let this_epoch = existing_epochs[i];
444 assert!(this_epoch >= epoch);
445 if this_epoch == epoch {
446 if i + 1 == existing_epochs.len() {
447 Ok(None)
449 } else {
450 Ok(Some(existing_epochs[i + 1]))
451 }
452 } else {
453 Err(())
455 }
456 }
457 };
458 assert_eq!(expected, table_change_log.next_epoch(epoch));
459 }
460 }
461
462 #[test]
463 fn test_truncate() {
464 let mut table_change_log = TableChangeLogCommon::<SstableInfo>::new([
465 EpochNewChangeLog {
466 new_value: vec![],
467 old_value: vec![],
468 non_checkpoint_epochs: vec![],
469 checkpoint_epoch: 1,
470 },
471 EpochNewChangeLog {
472 new_value: vec![],
473 old_value: vec![],
474 non_checkpoint_epochs: vec![],
475 checkpoint_epoch: 2,
476 },
477 EpochNewChangeLog {
478 new_value: vec![],
479 old_value: vec![],
480 non_checkpoint_epochs: vec![3],
481 checkpoint_epoch: 4,
482 },
483 EpochNewChangeLog {
484 new_value: vec![],
485 old_value: vec![],
486 non_checkpoint_epochs: vec![],
487 checkpoint_epoch: 5,
488 },
489 ]);
490 let origin_table_change_log = table_change_log.clone();
491 for truncate_epoch in 0..6 {
492 table_change_log.truncate(truncate_epoch);
493 let expected_table_change_log = TableChangeLogCommon(
494 origin_table_change_log
495 .0
496 .iter()
497 .filter_map(|epoch_change_log| {
498 let mut epoch_change_log = epoch_change_log.clone();
499 epoch_change_log
500 .non_checkpoint_epochs
501 .retain(|epoch| *epoch >= truncate_epoch);
502 if epoch_change_log.non_checkpoint_epochs.is_empty()
503 && epoch_change_log.checkpoint_epoch < truncate_epoch
504 {
505 None
506 } else {
507 Some(epoch_change_log)
508 }
509 })
510 .collect(),
511 );
512 assert_eq!(expected_table_change_log, table_change_log);
513 }
514 }
515}