risingwave_hummock_trace/replay/
runner.rs1use std::sync::Arc;
16use std::time::Instant;
17
18use super::{GlobalReplay, ReplayWorkerScheduler, WorkerScheduler};
19use crate::Operation;
20use crate::error::Result;
21use crate::read::TraceReader;
22
23pub struct HummockReplay<R: TraceReader, G: GlobalReplay> {
24 reader: R,
25 replay: Arc<G>,
26}
27
28impl<R: TraceReader, G: GlobalReplay + 'static> HummockReplay<R, G> {
29 pub fn new(reader: R, replay: G) -> Self {
30 Self {
31 reader,
32 replay: Arc::new(replay),
33 }
34 }
35
36 pub async fn run(&mut self) -> Result<()> {
37 self.run_with_scheduler(WorkerScheduler::new(self.replay.clone()))
38 .await
39 }
40
41 pub async fn run_with_scheduler<S: ReplayWorkerScheduler>(
42 &mut self,
43 mut worker_scheduler: S,
44 ) -> Result<()> {
45 let time = Instant::now();
46 let mut total_ops: u64 = 0;
47
48 while let Ok(r) = self.reader.read() {
49 match r.operation() {
50 Operation::Result(_) => {
52 worker_scheduler.send_result(r);
53 }
54 Operation::Finish => {
55 worker_scheduler.wait_finish(r.clone()).await;
56 }
57 _ => {
58 worker_scheduler.schedule(r);
59 total_ops += 1;
60 if total_ops % 10000 == 0 {
61 println!("replayed {} ops", total_ops);
62 }
63 }
64 };
65 }
66
67 println!("Replay finished, totally {} operations", total_ops);
68 worker_scheduler.shutdown().await;
69 println!("Total time {} seconds", time.elapsed().as_secs());
70 Ok(())
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use std::collections::VecDeque;
77
78 use bytes::Bytes;
79 use itertools::Itertools;
80 use mockall::predicate;
81
82 use super::*;
83 use crate::replay::{MockGlobalReplayInterface, MockLocalReplayInterface};
84 use crate::{
85 MockTraceReader, OperationResult, Record, StorageType, TraceError, TraceResult,
86 TracedBytes, TracedNewLocalOptions, TracedReadOptions,
87 };
88
89 #[tokio::test(flavor = "multi_thread")]
90 async fn test_replay() {
91 let mut mock_reader = MockTraceReader::new();
92 let get_result = TracedBytes::from(vec![54, 32, 198, 236, 24]);
93 let sync_id = 4561245432;
94
95 let opts1 = TracedNewLocalOptions::for_test(1);
96 let opts2 = TracedNewLocalOptions::for_test(2);
97 let opts3 = TracedNewLocalOptions::for_test(3);
98
99 let storage_type1 = StorageType::Local(0, 1);
100 let storage_type2 = StorageType::Local(1, 2);
101 let storage_type3 = StorageType::Local(2, 3);
102 let storage_type4 = StorageType::Global;
103
104 let actor_1 = vec![
105 (0, Operation::NewLocalStorage(opts1.clone(), 1)),
106 (
107 1,
108 Operation::get(
109 Bytes::from(vec![0, 1, 2, 3]),
110 Some(123),
111 TracedReadOptions::for_test(opts1.table_id.table_id),
112 ),
113 ),
114 (
115 1,
116 Operation::Result(OperationResult::Get(TraceResult::Ok(Some(
117 get_result.clone(),
118 )))),
119 ),
120 (1, Operation::Finish),
121 (
122 2,
123 Operation::insert(Bytes::from(vec![123]), Bytes::from(vec![123]), None),
124 ),
125 (
126 2,
127 Operation::Result(OperationResult::Insert(TraceResult::Ok(()))),
128 ),
129 (2, Operation::Finish),
130 (3, Operation::DropLocalStorage),
131 ]
132 .into_iter()
133 .map(|(record_id, op)| Ok(Record::new(storage_type1, record_id, op)));
134
135 let actor_2 = vec![
136 (4, Operation::NewLocalStorage(opts2.clone(), 2)),
137 (
138 5,
139 Operation::get(
140 TracedBytes::from(vec![0, 1, 2, 3]).into(),
141 Some(123),
142 TracedReadOptions::for_test(opts2.table_id.table_id),
143 ),
144 ),
145 (
146 5,
147 Operation::Result(OperationResult::Get(TraceResult::Ok(Some(
148 get_result.clone(),
149 )))),
150 ),
151 (5, Operation::Finish),
152 (
153 6,
154 Operation::insert(Bytes::from(vec![123]), Bytes::from(vec![123]), None),
155 ),
156 (
157 6,
158 Operation::Result(OperationResult::Insert(TraceResult::Ok(()))),
159 ),
160 (6, Operation::Finish),
161 (7, Operation::DropLocalStorage),
162 ]
163 .into_iter()
164 .map(|(record_id, op)| Ok(Record::new(storage_type2, record_id, op)));
165
166 let actor_3 = vec![
167 (8, Operation::NewLocalStorage(opts3.clone(), 3)),
168 (
169 9,
170 Operation::get(
171 TracedBytes::from(vec![0, 1, 2, 3]).into(),
172 Some(123),
173 TracedReadOptions::for_test(opts3.table_id.table_id),
174 ),
175 ),
176 (
177 9,
178 Operation::Result(OperationResult::Get(TraceResult::Ok(Some(
179 get_result.clone(),
180 )))),
181 ),
182 (9, Operation::Finish),
183 (
184 10,
185 Operation::insert(Bytes::from(vec![123]), Bytes::from(vec![123]), None),
186 ),
187 (
188 10,
189 Operation::Result(OperationResult::Insert(TraceResult::Ok(()))),
190 ),
191 (10, Operation::Finish),
192 (11, Operation::DropLocalStorage),
193 ]
194 .into_iter()
195 .map(|(record_id, op)| Ok(Record::new(storage_type3, record_id, op)));
196
197 let mut non_local: Vec<Result<Record>> = vec![
198 (12, Operation::Finish),
199 (13, Operation::Sync(vec![(sync_id, vec![1, 2, 3])])),
200 (
201 13,
202 Operation::Result(OperationResult::Sync(TraceResult::Ok(0))),
203 ),
204 (13, Operation::Finish),
205 ]
206 .into_iter()
207 .map(|(record_id, op)| Ok(Record::new(storage_type4, record_id, op)))
208 .collect();
209
210 let mut actors = actor_1
212 .into_iter()
213 .interleave(actor_2.into_iter().interleave(actor_3.into_iter()))
214 .collect::<Vec<_>>();
215
216 actors.append(&mut non_local);
217
218 actors.push(Err(TraceError::FinRecord(8))); let mut records: VecDeque<Result<Record>> = VecDeque::from(actors);
221
222 let records_len = records.len();
223 let f = move || records.pop_front().unwrap();
224
225 mock_reader.expect_read().times(records_len).returning(f);
226
227 let mut mock_replay = MockGlobalReplayInterface::new();
228
229 mock_replay.expect_new_local().times(3).returning(move |_| {
230 let mut mock_local = MockLocalReplayInterface::new();
231
232 mock_local
233 .expect_get()
234 .times(1)
235 .returning(move |_, _| Ok(Some(TracedBytes::from(vec![54, 32, 198, 236, 24]))));
236
237 mock_local
238 .expect_insert()
239 .times(1)
240 .returning(move |_, _, _| Ok(()));
241
242 Box::new(mock_local)
243 });
244
245 mock_replay
246 .expect_sync()
247 .with(predicate::eq(vec![(sync_id, vec![1, 2, 3])]))
248 .times(1)
249 .returning(|_| Ok(0));
250
251 let mut replay = HummockReplay::new(mock_reader, mock_replay);
252
253 replay.run().await.unwrap();
254 }
255}