1use std::collections::{HashMap, VecDeque};
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
19use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo, PbTableChangeLog};
20use tracing::warn;
21
22use crate::sstable_info::SstableInfo;
23
24#[derive(Debug, Clone, PartialEq)]
25pub struct TableChangeLogCommon<T>(
26 VecDeque<EpochNewChangeLogCommon<T>>,
28);
29
30impl<T> TableChangeLogCommon<T> {
31 pub fn new(logs: impl IntoIterator<Item = EpochNewChangeLogCommon<T>>) -> Self {
32 let logs = logs.into_iter().collect::<VecDeque<_>>();
33 debug_assert!(logs.iter().flat_map(|log| log.epochs()).is_sorted());
34 Self(logs)
35 }
36
37 pub fn iter(&self) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> {
38 self.0.iter()
39 }
40
41 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
42 self.0.iter_mut()
43 }
44
45 pub fn add_change_log(&mut self, new_change_log: EpochNewChangeLogCommon<T>) {
46 if let Some(prev_log) = self.0.back() {
47 assert!(prev_log.checkpoint_epoch < new_change_log.first_epoch());
48 }
49 self.0.push_back(new_change_log);
50 }
51
52 pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
53 self.0
54 .iter()
55 .flat_map(|epoch_change_log| epoch_change_log.epochs())
56 }
57
58 pub(crate) fn change_log_into_iter(self) -> impl Iterator<Item = EpochNewChangeLogCommon<T>> {
59 self.0.into_iter()
60 }
61
62 pub(crate) fn change_log_iter_mut(
63 &mut self,
64 ) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
65 self.0.iter_mut()
66 }
67}
68
69pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;
70
71#[derive(Debug, Clone, PartialEq)]
72pub struct EpochNewChangeLogCommon<T> {
73 pub new_value: Vec<T>,
74 pub old_value: Vec<T>,
75 pub non_checkpoint_epochs: Vec<u64>,
77 pub checkpoint_epoch: u64,
78}
79
80pub(crate) fn resolve_pb_log_epochs(epochs: &Vec<u64>) -> (Vec<u64>, u64) {
81 (
82 Vec::from(&epochs[0..(epochs.len() - 1)]),
83 *epochs.last().expect("non-empty"),
84 )
85}
86
87impl<T> EpochNewChangeLogCommon<T> {
88 pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
89 self.non_checkpoint_epochs
90 .iter()
91 .cloned()
92 .chain([self.checkpoint_epoch])
93 }
94
95 pub fn first_epoch(&self) -> u64 {
96 self.non_checkpoint_epochs
97 .first()
98 .cloned()
99 .unwrap_or(self.checkpoint_epoch)
100 }
101}
102
103pub type EpochNewChangeLog = EpochNewChangeLogCommon<SstableInfo>;
104
105impl<T> From<&EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
106where
107 PbSstableInfo: for<'a> From<&'a T>,
108{
109 fn from(val: &EpochNewChangeLogCommon<T>) -> Self {
110 Self {
111 new_value: val.new_value.iter().map(|a| a.into()).collect(),
112 old_value: val.old_value.iter().map(|a| a.into()).collect(),
113 epochs: val.epochs().collect(),
114 }
115 }
116}
117
118impl<T> From<&PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
119where
120 T: for<'a> From<&'a PbSstableInfo>,
121{
122 fn from(value: &PbEpochNewChangeLog) -> Self {
123 let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
124 Self {
125 new_value: value.new_value.iter().map(|a| a.into()).collect(),
126 old_value: value.old_value.iter().map(|a| a.into()).collect(),
127 non_checkpoint_epochs,
128 checkpoint_epoch,
129 }
130 }
131}
132
133impl<T> From<EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
134where
135 PbSstableInfo: From<T>,
136{
137 fn from(val: EpochNewChangeLogCommon<T>) -> Self {
138 Self {
139 epochs: val.epochs().collect(),
140 new_value: val.new_value.into_iter().map(|a| a.into()).collect(),
141 old_value: val.old_value.into_iter().map(|a| a.into()).collect(),
142 }
143 }
144}
145
146impl<T> From<PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
147where
148 T: From<PbSstableInfo>,
149{
150 fn from(value: PbEpochNewChangeLog) -> Self {
151 let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
152 Self {
153 new_value: value.new_value.into_iter().map(|a| a.into()).collect(),
154 old_value: value.old_value.into_iter().map(|a| a.into()).collect(),
155 non_checkpoint_epochs,
156 checkpoint_epoch,
157 }
158 }
159}
160
161impl<T> TableChangeLogCommon<T> {
162 pub fn filter_epoch(
163 &self,
164 (min_epoch, max_epoch): (u64, u64),
165 ) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> + '_ {
166 let start = self
167 .0
168 .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < min_epoch);
169 let end = self
170 .0
171 .partition_point(|epoch_change_log| epoch_change_log.first_epoch() <= max_epoch);
172 self.0.range(start..end)
173 }
174
175 #[expect(clippy::result_unit_err)]
181 pub fn next_epoch(&self, epoch: u64) -> Result<Option<u64>, ()> {
182 let start = self
183 .0
184 .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < epoch);
185 debug_assert!(
186 self.0
187 .range(start..)
188 .flat_map(|epoch_change_log| epoch_change_log.epochs())
189 .is_sorted()
190 );
191 let mut later_epochs = self
192 .0
193 .range(start..)
194 .flat_map(|epoch_change_log| epoch_change_log.epochs())
195 .skip_while(|log_epoch| *log_epoch < epoch);
196 if let Some(first_epoch) = later_epochs.next() {
197 assert!(
198 first_epoch >= epoch,
199 "first_epoch {} < epoch {}",
200 first_epoch,
201 epoch
202 );
203 if first_epoch != epoch {
204 return Err(());
205 }
206 if let Some(next_epoch) = later_epochs.next() {
207 assert!(
208 next_epoch > epoch,
209 "next_epoch {} not exceed epoch {}",
210 next_epoch,
211 epoch
212 );
213 Ok(Some(next_epoch))
214 } else {
215 Ok(None)
217 }
218 } else {
219 Ok(None)
221 }
222 }
223
224 pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec<u64> {
226 self.filter_epoch((min_epoch, u64::MAX))
227 .filter(|epoch_change_log| {
228 let new_value_empty = epoch_change_log.new_value.is_empty();
230 let old_value_empty = epoch_change_log.old_value.is_empty();
231 !new_value_empty || !old_value_empty
232 })
233 .flat_map(|epoch_change_log| epoch_change_log.epochs())
234 .filter(|a| a >= &min_epoch)
235 .take(max_count)
236 .collect()
237 }
238
239 pub fn truncate(&mut self, truncate_epoch: u64) {
240 while let Some(change_log) = self.0.front()
241 && change_log.checkpoint_epoch < truncate_epoch
242 {
243 let _change_log = self.0.pop_front().expect("non-empty");
244 }
245 if let Some(first_log) = self.0.front_mut() {
246 first_log
247 .non_checkpoint_epochs
248 .retain(|epoch| *epoch >= truncate_epoch);
249 }
250 }
251}
252
253impl<T> TableChangeLogCommon<T>
254where
255 PbSstableInfo: for<'a> From<&'a T>,
256{
257 pub fn to_protobuf(&self) -> PbTableChangeLog {
258 PbTableChangeLog {
259 change_logs: self.0.iter().map(|a| a.into()).collect(),
260 }
261 }
262}
263
264impl<T> TableChangeLogCommon<T>
265where
266 T: for<'a> From<&'a PbSstableInfo>,
267{
268 pub fn from_protobuf(val: &PbTableChangeLog) -> Self {
269 Self(val.change_logs.iter().map(|a| a.into()).collect())
270 }
271}
272
273impl<T> TableChangeLogCommon<T>
274where
275 T: From<PbSstableInfo>,
276{
277 pub fn from_protobuf_owned(val: PbTableChangeLog) -> Self {
278 Self(val.change_logs.into_iter().map(|a| a.into()).collect())
279 }
280}
281
282pub fn build_table_change_log_delta<'a>(
283 old_value_ssts: impl Iterator<Item = SstableInfo>,
284 new_value_ssts: impl Iterator<Item = &'a SstableInfo>,
285 epochs: &Vec<u64>,
286 log_store_table_ids: impl Iterator<Item = (TableId, u64)>,
287) -> HashMap<TableId, ChangeLogDelta> {
288 let mut table_change_log: HashMap<_, _> = log_store_table_ids
289 .map(|(table_id, truncate_epoch)| {
290 let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(epochs);
291 (
292 table_id,
293 ChangeLogDelta {
294 truncate_epoch,
295 new_log: EpochNewChangeLog {
296 new_value: vec![],
297 old_value: vec![],
298 non_checkpoint_epochs,
299 checkpoint_epoch,
300 },
301 },
302 )
303 })
304 .collect();
305 for sst in old_value_ssts {
306 for table_id in &sst.table_ids {
307 match table_change_log.get_mut(table_id) {
308 Some(log) => {
309 log.new_log.old_value.push(sst.clone());
310 }
311 None => {
312 warn!(%table_id, ?sst, "old value sst contains non-log-store table");
313 }
314 }
315 }
316 }
317 for sst in new_value_ssts {
318 for table_id in &sst.table_ids {
319 if let Some(log) = table_change_log.get_mut(table_id) {
320 log.new_log.new_value.push(sst.clone());
321 }
322 }
323 }
324 table_change_log
325}
326
327#[derive(Debug, PartialEq, Clone)]
328pub struct ChangeLogDeltaCommon<T> {
329 pub truncate_epoch: u64,
330 pub new_log: EpochNewChangeLogCommon<T>,
331}
332
333pub type ChangeLogDelta = ChangeLogDeltaCommon<SstableInfo>;
334
335impl<T> From<&ChangeLogDeltaCommon<T>> for PbChangeLogDelta
336where
337 PbSstableInfo: for<'a> From<&'a T>,
338{
339 fn from(val: &ChangeLogDeltaCommon<T>) -> Self {
340 Self {
341 truncate_epoch: val.truncate_epoch,
342 new_log: Some((&val.new_log).into()),
343 }
344 }
345}
346
347impl<T> From<&PbChangeLogDelta> for ChangeLogDeltaCommon<T>
348where
349 T: for<'a> From<&'a PbSstableInfo>,
350{
351 fn from(val: &PbChangeLogDelta) -> Self {
352 Self {
353 truncate_epoch: val.truncate_epoch,
354 new_log: val.new_log.as_ref().unwrap().into(),
355 }
356 }
357}
358
359impl<T> From<ChangeLogDeltaCommon<T>> for PbChangeLogDelta
360where
361 PbSstableInfo: From<T>,
362{
363 fn from(val: ChangeLogDeltaCommon<T>) -> Self {
364 Self {
365 truncate_epoch: val.truncate_epoch,
366 new_log: Some(val.new_log.into()),
367 }
368 }
369}
370
371impl<T> From<PbChangeLogDelta> for ChangeLogDeltaCommon<T>
372where
373 T: From<PbSstableInfo>,
374{
375 fn from(val: PbChangeLogDelta) -> Self {
376 Self {
377 truncate_epoch: val.truncate_epoch,
378 new_log: val.new_log.unwrap().into(),
379 }
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use itertools::Itertools;
386
387 use crate::change_log::{EpochNewChangeLog, TableChangeLogCommon};
388 use crate::sstable_info::SstableInfo;
389
390 #[test]
391 fn test_filter_epoch() {
392 let table_change_log = TableChangeLogCommon::<SstableInfo>::new([
393 EpochNewChangeLog {
394 new_value: vec![],
395 old_value: vec![],
396 non_checkpoint_epochs: vec![],
397 checkpoint_epoch: 2,
398 },
399 EpochNewChangeLog {
400 new_value: vec![],
401 old_value: vec![],
402 non_checkpoint_epochs: vec![3],
403 checkpoint_epoch: 4,
404 },
405 EpochNewChangeLog {
406 new_value: vec![],
407 old_value: vec![],
408 non_checkpoint_epochs: vec![],
409 checkpoint_epoch: 6,
410 },
411 EpochNewChangeLog {
412 new_value: vec![],
413 old_value: vec![],
414 non_checkpoint_epochs: vec![8],
415 checkpoint_epoch: 10,
416 },
417 ]);
418
419 let epochs = (1..=11).collect_vec();
420 for i in 0..epochs.len() {
421 for j in i..epochs.len() {
422 let min_epoch = epochs[i];
423 let max_epoch = epochs[j];
424 let expected = table_change_log
425 .0
426 .iter()
427 .filter(|log| {
428 min_epoch <= log.checkpoint_epoch && log.first_epoch() <= max_epoch
429 })
430 .cloned()
431 .collect_vec();
432 let actual = table_change_log
433 .filter_epoch((min_epoch, max_epoch))
434 .cloned()
435 .collect_vec();
436 assert_eq!(expected, actual, "{:?}", (min_epoch, max_epoch));
437 }
438 }
439
440 let existing_epochs = table_change_log.epochs().collect_vec();
441 assert!(existing_epochs.is_sorted());
442 for &epoch in &epochs {
443 let expected = match existing_epochs
444 .iter()
445 .position(|existing_epoch| *existing_epoch >= epoch)
446 {
447 None => {
448 Ok(None)
450 }
451 Some(i) => {
452 let this_epoch = existing_epochs[i];
453 assert!(this_epoch >= epoch);
454 if this_epoch == epoch {
455 if i + 1 == existing_epochs.len() {
456 Ok(None)
458 } else {
459 Ok(Some(existing_epochs[i + 1]))
460 }
461 } else {
462 Err(())
464 }
465 }
466 };
467 assert_eq!(expected, table_change_log.next_epoch(epoch));
468 }
469 }
470
471 #[test]
472 fn test_truncate() {
473 let mut table_change_log = TableChangeLogCommon::<SstableInfo>::new([
474 EpochNewChangeLog {
475 new_value: vec![],
476 old_value: vec![],
477 non_checkpoint_epochs: vec![],
478 checkpoint_epoch: 1,
479 },
480 EpochNewChangeLog {
481 new_value: vec![],
482 old_value: vec![],
483 non_checkpoint_epochs: vec![],
484 checkpoint_epoch: 2,
485 },
486 EpochNewChangeLog {
487 new_value: vec![],
488 old_value: vec![],
489 non_checkpoint_epochs: vec![3],
490 checkpoint_epoch: 4,
491 },
492 EpochNewChangeLog {
493 new_value: vec![],
494 old_value: vec![],
495 non_checkpoint_epochs: vec![],
496 checkpoint_epoch: 5,
497 },
498 ]);
499 let origin_table_change_log = table_change_log.clone();
500 for truncate_epoch in 0..6 {
501 table_change_log.truncate(truncate_epoch);
502 let expected_table_change_log = TableChangeLogCommon(
503 origin_table_change_log
504 .0
505 .iter()
506 .filter_map(|epoch_change_log| {
507 let mut epoch_change_log = epoch_change_log.clone();
508 epoch_change_log
509 .non_checkpoint_epochs
510 .retain(|epoch| *epoch >= truncate_epoch);
511 if epoch_change_log.non_checkpoint_epochs.is_empty()
512 && epoch_change_log.checkpoint_epoch < truncate_epoch
513 {
514 None
515 } else {
516 Some(epoch_change_log)
517 }
518 })
519 .collect(),
520 );
521 assert_eq!(expected_table_change_log, table_change_log);
522 }
523 }
524}