risingwave_hummock_trace/replay/
runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Instant;
17
18use super::{GlobalReplay, ReplayWorkerScheduler, WorkerScheduler};
19use crate::Operation;
20use crate::error::Result;
21use crate::read::TraceReader;
22
23pub struct HummockReplay<R: TraceReader, G: GlobalReplay> {
24    reader: R,
25    replay: Arc<G>,
26}
27
28impl<R: TraceReader, G: GlobalReplay + 'static> HummockReplay<R, G> {
29    pub fn new(reader: R, replay: G) -> Self {
30        Self {
31            reader,
32            replay: Arc::new(replay),
33        }
34    }
35
36    pub async fn run(&mut self) -> Result<()> {
37        self.run_with_scheduler(WorkerScheduler::new(self.replay.clone()))
38            .await
39    }
40
41    pub async fn run_with_scheduler<S: ReplayWorkerScheduler>(
42        &mut self,
43        mut worker_scheduler: S,
44    ) -> Result<()> {
45        let time = Instant::now();
46        let mut total_ops: u64 = 0;
47
48        while let Ok(r) = self.reader.read() {
49            match r.operation() {
50                // check results
51                Operation::Result(_) => {
52                    worker_scheduler.send_result(r);
53                }
54                Operation::Finish => {
55                    worker_scheduler.wait_finish(r.clone()).await;
56                }
57                _ => {
58                    worker_scheduler.schedule(r);
59                    total_ops += 1;
60                    if total_ops % 10000 == 0 {
61                        println!("replayed {} ops", total_ops);
62                    }
63                }
64            };
65        }
66
67        println!("Replay finished, totally {} operations", total_ops);
68        worker_scheduler.shutdown().await;
69        println!("Total time {} seconds", time.elapsed().as_secs());
70        Ok(())
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use std::collections::VecDeque;
77
78    use bytes::Bytes;
79    use itertools::Itertools;
80    use mockall::predicate;
81
82    use super::*;
83    use crate::replay::{MockGlobalReplayInterface, MockLocalReplayInterface};
84    use crate::{
85        MockTraceReader, OperationResult, Record, StorageType, TraceError, TraceResult,
86        TracedBytes, TracedNewLocalOptions, TracedReadOptions,
87    };
88
89    #[tokio::test(flavor = "multi_thread")]
90    async fn test_replay() {
91        let mut mock_reader = MockTraceReader::new();
92        let get_result = TracedBytes::from(vec![54, 32, 198, 236, 24]);
93        let sync_id = 4561245432;
94
95        let opts1 = TracedNewLocalOptions::for_test(1);
96        let opts2 = TracedNewLocalOptions::for_test(2);
97        let opts3 = TracedNewLocalOptions::for_test(3);
98
99        let storage_type1 = StorageType::Local(0, 1);
100        let storage_type2 = StorageType::Local(1, 2);
101        let storage_type3 = StorageType::Local(2, 3);
102        let storage_type4 = StorageType::Global;
103
104        let actor_1 = vec![
105            (0, Operation::NewLocalStorage(opts1.clone(), 1)),
106            (
107                1,
108                Operation::get(
109                    Bytes::from(vec![0, 1, 2, 3]),
110                    Some(123),
111                    TracedReadOptions::for_test(opts1.table_id.table_id),
112                ),
113            ),
114            (
115                1,
116                Operation::Result(OperationResult::Get(TraceResult::Ok(Some(
117                    get_result.clone(),
118                )))),
119            ),
120            (1, Operation::Finish),
121            (
122                2,
123                Operation::insert(Bytes::from(vec![123]), Bytes::from(vec![123]), None),
124            ),
125            (
126                2,
127                Operation::Result(OperationResult::Insert(TraceResult::Ok(()))),
128            ),
129            (2, Operation::Finish),
130            (3, Operation::DropLocalStorage),
131        ]
132        .into_iter()
133        .map(|(record_id, op)| Ok(Record::new(storage_type1, record_id, op)));
134
135        let actor_2 = vec![
136            (4, Operation::NewLocalStorage(opts2.clone(), 2)),
137            (
138                5,
139                Operation::get(
140                    TracedBytes::from(vec![0, 1, 2, 3]).into(),
141                    Some(123),
142                    TracedReadOptions::for_test(opts2.table_id.table_id),
143                ),
144            ),
145            (
146                5,
147                Operation::Result(OperationResult::Get(TraceResult::Ok(Some(
148                    get_result.clone(),
149                )))),
150            ),
151            (5, Operation::Finish),
152            (
153                6,
154                Operation::insert(Bytes::from(vec![123]), Bytes::from(vec![123]), None),
155            ),
156            (
157                6,
158                Operation::Result(OperationResult::Insert(TraceResult::Ok(()))),
159            ),
160            (6, Operation::Finish),
161            (7, Operation::DropLocalStorage),
162        ]
163        .into_iter()
164        .map(|(record_id, op)| Ok(Record::new(storage_type2, record_id, op)));
165
166        let actor_3 = vec![
167            (8, Operation::NewLocalStorage(opts3.clone(), 3)),
168            (
169                9,
170                Operation::get(
171                    TracedBytes::from(vec![0, 1, 2, 3]).into(),
172                    Some(123),
173                    TracedReadOptions::for_test(opts3.table_id.table_id),
174                ),
175            ),
176            (
177                9,
178                Operation::Result(OperationResult::Get(TraceResult::Ok(Some(
179                    get_result.clone(),
180                )))),
181            ),
182            (9, Operation::Finish),
183            (
184                10,
185                Operation::insert(Bytes::from(vec![123]), Bytes::from(vec![123]), None),
186            ),
187            (
188                10,
189                Operation::Result(OperationResult::Insert(TraceResult::Ok(()))),
190            ),
191            (10, Operation::Finish),
192            (11, Operation::DropLocalStorage),
193        ]
194        .into_iter()
195        .map(|(record_id, op)| Ok(Record::new(storage_type3, record_id, op)));
196
197        let mut non_local: Vec<Result<Record>> = vec![
198            (12, Operation::Finish),
199            (13, Operation::Sync(vec![(sync_id, vec![1, 2, 3])])),
200            (
201                13,
202                Operation::Result(OperationResult::Sync(TraceResult::Ok(0))),
203            ),
204            (13, Operation::Finish),
205        ]
206        .into_iter()
207        .map(|(record_id, op)| Ok(Record::new(storage_type4, record_id, op)))
208        .collect();
209
210        // interleave vectors to simulate concurrency
211        let mut actors = actor_1
212            .into_iter()
213            .interleave(actor_2.into_iter().interleave(actor_3.into_iter()))
214            .collect::<Vec<_>>();
215
216        actors.append(&mut non_local);
217
218        actors.push(Err(TraceError::FinRecord(8))); // intentional error to stop loop
219
220        let mut records: VecDeque<Result<Record>> = VecDeque::from(actors);
221
222        let records_len = records.len();
223        let f = move || records.pop_front().unwrap();
224
225        mock_reader.expect_read().times(records_len).returning(f);
226
227        let mut mock_replay = MockGlobalReplayInterface::new();
228
229        mock_replay.expect_new_local().times(3).returning(move |_| {
230            let mut mock_local = MockLocalReplayInterface::new();
231
232            mock_local
233                .expect_get()
234                .times(1)
235                .returning(move |_, _| Ok(Some(TracedBytes::from(vec![54, 32, 198, 236, 24]))));
236
237            mock_local
238                .expect_insert()
239                .times(1)
240                .returning(move |_, _, _| Ok(()));
241
242            Box::new(mock_local)
243        });
244
245        mock_replay
246            .expect_sync()
247            .with(predicate::eq(vec![(sync_id, vec![1, 2, 3])]))
248            .times(1)
249            .returning(|_| Ok(0));
250
251        let mut replay = HummockReplay::new(mock_reader, mock_replay);
252
253        replay.run().await.unwrap();
254    }
255}