1use std::collections::{HashMap, VecDeque};
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
19use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo, PbTableChangeLog};
20use tracing::warn;
21
22use crate::sstable_info::SstableInfo;
23
24#[derive(Debug, Clone, PartialEq)]
25pub struct TableChangeLogCommon<T>(
26 VecDeque<EpochNewChangeLogCommon<T>>,
28);
29
30impl<T> TableChangeLogCommon<T> {
31 pub fn new(logs: impl IntoIterator<Item = EpochNewChangeLogCommon<T>>) -> Self {
32 let logs = logs.into_iter().collect::<VecDeque<_>>();
33 debug_assert!(logs.iter().flat_map(|log| log.epochs.iter()).is_sorted());
34 Self(logs)
35 }
36
37 pub fn iter(&self) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> {
38 self.0.iter()
39 }
40
41 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
42 self.0.iter_mut()
43 }
44
45 pub fn add_change_log(&mut self, new_change_log: EpochNewChangeLogCommon<T>) {
46 if let Some(prev_log) = self.0.back() {
47 assert!(
48 prev_log.epochs.last().expect("non-empty")
49 < new_change_log.epochs.first().expect("non-empty")
50 );
51 }
52 self.0.push_back(new_change_log);
53 }
54
55 pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
56 self.0
57 .iter()
58 .flat_map(|epoch_change_log| epoch_change_log.epochs.iter())
59 .cloned()
60 }
61
62 pub(crate) fn change_log_into_iter(self) -> impl Iterator<Item = EpochNewChangeLogCommon<T>> {
63 self.0.into_iter()
64 }
65
66 pub(crate) fn change_log_iter_mut(
67 &mut self,
68 ) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
69 self.0.iter_mut()
70 }
71}
72
73pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;
74
75#[derive(Debug, Clone, PartialEq)]
76pub struct EpochNewChangeLogCommon<T> {
77 pub new_value: Vec<T>,
78 pub old_value: Vec<T>,
79 pub epochs: Vec<u64>,
81}
82
83pub type EpochNewChangeLog = EpochNewChangeLogCommon<SstableInfo>;
84
85impl<T> From<&EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
86where
87 PbSstableInfo: for<'a> From<&'a T>,
88{
89 fn from(val: &EpochNewChangeLogCommon<T>) -> Self {
90 Self {
91 new_value: val.new_value.iter().map(|a| a.into()).collect(),
92 old_value: val.old_value.iter().map(|a| a.into()).collect(),
93 epochs: val.epochs.clone(),
94 }
95 }
96}
97
98impl<T> From<&PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
99where
100 T: for<'a> From<&'a PbSstableInfo>,
101{
102 fn from(value: &PbEpochNewChangeLog) -> Self {
103 Self {
104 new_value: value.new_value.iter().map(|a| a.into()).collect(),
105 old_value: value.old_value.iter().map(|a| a.into()).collect(),
106 epochs: value.epochs.clone(),
107 }
108 }
109}
110
111impl<T> From<EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
112where
113 PbSstableInfo: From<T>,
114{
115 fn from(val: EpochNewChangeLogCommon<T>) -> Self {
116 Self {
117 new_value: val.new_value.into_iter().map(|a| a.into()).collect(),
118 old_value: val.old_value.into_iter().map(|a| a.into()).collect(),
119 epochs: val.epochs,
120 }
121 }
122}
123
124impl<T> From<PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
125where
126 T: From<PbSstableInfo>,
127{
128 fn from(value: PbEpochNewChangeLog) -> Self {
129 Self {
130 new_value: value.new_value.into_iter().map(|a| a.into()).collect(),
131 old_value: value.old_value.into_iter().map(|a| a.into()).collect(),
132 epochs: value.epochs,
133 }
134 }
135}
136
137impl<T> TableChangeLogCommon<T> {
138 pub fn filter_epoch(
139 &self,
140 (min_epoch, max_epoch): (u64, u64),
141 ) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> + '_ {
142 let start = self.0.partition_point(|epoch_change_log| {
143 epoch_change_log.epochs.last().expect("non-empty") < &min_epoch
144 });
145 let end = self.0.partition_point(|epoch_change_log| {
146 epoch_change_log.epochs.first().expect("non-empty") <= &max_epoch
147 });
148 self.0.range(start..end)
149 }
150
151 #[expect(clippy::result_unit_err)]
157 pub fn next_epoch(&self, epoch: u64) -> Result<Option<u64>, ()> {
158 let start = self.0.partition_point(|epoch_change_log| {
159 epoch_change_log.epochs.last().expect("non-empty") < &epoch
160 });
161 debug_assert!(
162 self.0
163 .range(start..)
164 .flat_map(|epoch_change_log| epoch_change_log.epochs.iter())
165 .is_sorted()
166 );
167 let mut later_epochs = self
168 .0
169 .range(start..)
170 .flat_map(|epoch_change_log| epoch_change_log.epochs.iter())
171 .skip_while(|log_epoch| **log_epoch < epoch);
172 if let Some(first_epoch) = later_epochs.next() {
173 assert!(
174 *first_epoch >= epoch,
175 "first_epoch {} < epoch {}",
176 first_epoch,
177 epoch
178 );
179 if *first_epoch != epoch {
180 return Err(());
181 }
182 if let Some(next_epoch) = later_epochs.next() {
183 assert!(
184 *next_epoch > epoch,
185 "next_epoch {} not exceed epoch {}",
186 next_epoch,
187 epoch
188 );
189 Ok(Some(*next_epoch))
190 } else {
191 Ok(None)
193 }
194 } else {
195 Ok(None)
197 }
198 }
199
200 pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec<u64> {
202 self.filter_epoch((min_epoch, u64::MAX))
203 .filter(|epoch_change_log| {
204 let new_value_empty = epoch_change_log.new_value.is_empty();
206 let old_value_empty = epoch_change_log.old_value.is_empty();
207 !new_value_empty || !old_value_empty
208 })
209 .flat_map(|epoch_change_log| epoch_change_log.epochs.iter().cloned())
210 .filter(|a| a >= &min_epoch)
211 .take(max_count)
212 .collect()
213 }
214
215 pub fn truncate(&mut self, truncate_epoch: u64) {
216 while let Some(change_log) = self.0.front()
217 && *change_log.epochs.last().expect("non-empty") < truncate_epoch
218 {
219 let _change_log = self.0.pop_front().expect("non-empty");
220 }
221 if let Some(first_log) = self.0.front_mut() {
222 first_log.epochs.retain(|epoch| *epoch >= truncate_epoch);
223 }
224 }
225}
226
227impl<T> TableChangeLogCommon<T>
228where
229 PbSstableInfo: for<'a> From<&'a T>,
230{
231 pub fn to_protobuf(&self) -> PbTableChangeLog {
232 PbTableChangeLog {
233 change_logs: self.0.iter().map(|a| a.into()).collect(),
234 }
235 }
236}
237
238impl<T> TableChangeLogCommon<T>
239where
240 T: for<'a> From<&'a PbSstableInfo>,
241{
242 pub fn from_protobuf(val: &PbTableChangeLog) -> Self {
243 Self(val.change_logs.iter().map(|a| a.into()).collect())
244 }
245}
246
247pub fn build_table_change_log_delta<'a>(
248 old_value_ssts: impl Iterator<Item = SstableInfo>,
249 new_value_ssts: impl Iterator<Item = &'a SstableInfo>,
250 epochs: &Vec<u64>,
251 log_store_table_ids: impl Iterator<Item = (u32, u64)>,
252) -> HashMap<TableId, ChangeLogDelta> {
253 let mut table_change_log: HashMap<_, _> = log_store_table_ids
254 .map(|(table_id, truncate_epoch)| {
255 (
256 TableId::new(table_id),
257 ChangeLogDelta {
258 truncate_epoch,
259 new_log: EpochNewChangeLog {
260 new_value: vec![],
261 old_value: vec![],
262 epochs: epochs.clone(),
263 },
264 },
265 )
266 })
267 .collect();
268 for sst in old_value_ssts {
269 for table_id in &sst.table_ids {
270 match table_change_log.get_mut(&TableId::new(*table_id)) {
271 Some(log) => {
272 log.new_log.old_value.push(sst.clone());
273 }
274 None => {
275 warn!(table_id, ?sst, "old value sst contains non-log-store table");
276 }
277 }
278 }
279 }
280 for sst in new_value_ssts {
281 for table_id in &sst.table_ids {
282 if let Some(log) = table_change_log.get_mut(&TableId::new(*table_id)) {
283 log.new_log.new_value.push(sst.clone());
284 }
285 }
286 }
287 table_change_log
288}
289
290#[derive(Debug, PartialEq, Clone)]
291pub struct ChangeLogDeltaCommon<T> {
292 pub truncate_epoch: u64,
293 pub new_log: EpochNewChangeLogCommon<T>,
294}
295
296pub type ChangeLogDelta = ChangeLogDeltaCommon<SstableInfo>;
297
298impl<T> From<&ChangeLogDeltaCommon<T>> for PbChangeLogDelta
299where
300 PbSstableInfo: for<'a> From<&'a T>,
301{
302 fn from(val: &ChangeLogDeltaCommon<T>) -> Self {
303 Self {
304 truncate_epoch: val.truncate_epoch,
305 new_log: Some((&val.new_log).into()),
306 }
307 }
308}
309
310impl<T> From<&PbChangeLogDelta> for ChangeLogDeltaCommon<T>
311where
312 T: for<'a> From<&'a PbSstableInfo>,
313{
314 fn from(val: &PbChangeLogDelta) -> Self {
315 Self {
316 truncate_epoch: val.truncate_epoch,
317 new_log: val.new_log.as_ref().unwrap().into(),
318 }
319 }
320}
321
322impl<T> From<ChangeLogDeltaCommon<T>> for PbChangeLogDelta
323where
324 PbSstableInfo: From<T>,
325{
326 fn from(val: ChangeLogDeltaCommon<T>) -> Self {
327 Self {
328 truncate_epoch: val.truncate_epoch,
329 new_log: Some(val.new_log.into()),
330 }
331 }
332}
333
334impl<T> From<PbChangeLogDelta> for ChangeLogDeltaCommon<T>
335where
336 T: From<PbSstableInfo>,
337{
338 fn from(val: PbChangeLogDelta) -> Self {
339 Self {
340 truncate_epoch: val.truncate_epoch,
341 new_log: val.new_log.unwrap().into(),
342 }
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use itertools::Itertools;
349
350 use crate::change_log::{EpochNewChangeLog, TableChangeLogCommon};
351 use crate::sstable_info::SstableInfo;
352
353 #[test]
354 fn test_filter_epoch() {
355 let table_change_log = TableChangeLogCommon::<SstableInfo>::new([
356 EpochNewChangeLog {
357 new_value: vec![],
358 old_value: vec![],
359 epochs: vec![2],
360 },
361 EpochNewChangeLog {
362 new_value: vec![],
363 old_value: vec![],
364 epochs: vec![3, 4],
365 },
366 EpochNewChangeLog {
367 new_value: vec![],
368 old_value: vec![],
369 epochs: vec![6],
370 },
371 EpochNewChangeLog {
372 new_value: vec![],
373 old_value: vec![],
374 epochs: vec![8, 10],
375 },
376 ]);
377
378 let epochs = (1..=11).collect_vec();
379 for i in 0..epochs.len() {
380 for j in i..epochs.len() {
381 let min_epoch = epochs[i];
382 let max_epoch = epochs[j];
383 let expected = table_change_log
384 .0
385 .iter()
386 .filter(|log| {
387 &min_epoch <= log.epochs.last().unwrap()
388 && log.epochs.first().unwrap() <= &max_epoch
389 })
390 .cloned()
391 .collect_vec();
392 let actual = table_change_log
393 .filter_epoch((min_epoch, max_epoch))
394 .cloned()
395 .collect_vec();
396 assert_eq!(expected, actual, "{:?}", (min_epoch, max_epoch));
397 }
398 }
399
400 let existing_epochs = table_change_log.epochs().collect_vec();
401 assert!(existing_epochs.is_sorted());
402 for &epoch in &epochs {
403 let expected = match existing_epochs
404 .iter()
405 .position(|existing_epoch| *existing_epoch >= epoch)
406 {
407 None => {
408 Ok(None)
410 }
411 Some(i) => {
412 let this_epoch = existing_epochs[i];
413 assert!(this_epoch >= epoch);
414 if this_epoch == epoch {
415 if i + 1 == existing_epochs.len() {
416 Ok(None)
418 } else {
419 Ok(Some(existing_epochs[i + 1]))
420 }
421 } else {
422 Err(())
424 }
425 }
426 };
427 assert_eq!(expected, table_change_log.next_epoch(epoch));
428 }
429 }
430
431 #[test]
432 fn test_truncate() {
433 let mut table_change_log = TableChangeLogCommon::<SstableInfo>::new([
434 EpochNewChangeLog {
435 new_value: vec![],
436 old_value: vec![],
437 epochs: vec![1],
438 },
439 EpochNewChangeLog {
440 new_value: vec![],
441 old_value: vec![],
442 epochs: vec![2],
443 },
444 EpochNewChangeLog {
445 new_value: vec![],
446 old_value: vec![],
447 epochs: vec![3, 4],
448 },
449 EpochNewChangeLog {
450 new_value: vec![],
451 old_value: vec![],
452 epochs: vec![5],
453 },
454 ]);
455 let origin_table_change_log = table_change_log.clone();
456 for truncate_epoch in 0..6 {
457 table_change_log.truncate(truncate_epoch);
458 let expected_table_change_log = TableChangeLogCommon(
459 origin_table_change_log
460 .0
461 .iter()
462 .filter_map(|epoch_change_log| {
463 let mut epoch_change_log = epoch_change_log.clone();
464 epoch_change_log
465 .epochs
466 .retain(|epoch| *epoch >= truncate_epoch);
467 if epoch_change_log.epochs.is_empty() {
468 None
469 } else {
470 Some(epoch_change_log)
471 }
472 })
473 .collect(),
474 );
475 assert_eq!(expected_table_change_log, table_change_log);
476 }
477 }
478}