risingwave_hummock_trace/replay/
mod.rs1mod runner;
16mod worker;
17
18use std::ops::Bound;
19
20use futures::Stream;
21use futures::stream::BoxStream;
22#[cfg(test)]
23use futures_async_stream::try_stream;
24#[cfg(test)]
25use mockall::{automock, mock};
26use risingwave_hummock_sdk::HummockReadEpoch;
27use risingwave_pb::meta::subscribe_response::{Info, Operation as RespOperation};
28pub use runner::*;
29pub(crate) use worker::*;
30
31#[cfg(test)]
32use crate::TraceError;
33use crate::error::Result;
34use crate::{
35 LocalStorageId, Record, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
36 TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
37};
38
39pub type ReplayItem = (TracedBytes, TracedBytes);
40pub trait ReplayItemStream = Stream<Item = ReplayItem> + Send;
41
42type ReplayGroup = Record;
43
44#[derive(Debug)]
45enum WorkerResponse {
46 Continue,
47 Shutdown,
48}
49
50pub(crate) type ReplayRequest = Option<ReplayGroup>;
51
52#[derive(PartialEq, Eq, Hash, Debug, Clone)]
53pub(crate) enum WorkerId {
54 Local(u64, LocalStorageId),
56 OneShot(u64),
58}
59
60#[async_trait::async_trait]
61pub trait LocalReplay: LocalReplayRead + ReplayWrite + Send + Sync {
62 async fn init(&mut self, options: TracedInitOptions) -> Result<()>;
63 fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions);
64 async fn try_flush(&mut self) -> Result<()>;
65 async fn flush(&mut self) -> Result<usize>;
66}
67pub trait GlobalReplay: ReplayRead + ReplayStateStore + Send + Sync {}
68
69#[cfg_attr(test, automock)]
70#[async_trait::async_trait]
71pub trait LocalReplayRead {
72 async fn iter(
73 &self,
74 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
75 read_options: TracedReadOptions,
76 ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
77 async fn get(
78 &self,
79 key: TracedBytes,
80 read_options: TracedReadOptions,
81 ) -> Result<Option<TracedBytes>>;
82}
83
84#[cfg_attr(test, automock)]
85#[async_trait::async_trait]
86pub trait ReplayRead {
87 async fn iter(
88 &self,
89 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
90 epoch: u64,
91 read_options: TracedReadOptions,
92 ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
93 async fn get(
94 &self,
95 key: TracedBytes,
96 epoch: u64,
97 read_options: TracedReadOptions,
98 ) -> Result<Option<TracedBytes>>;
99}
100
101#[cfg_attr(test, automock)]
102#[async_trait::async_trait]
103pub trait ReplayWrite {
104 fn insert(
105 &mut self,
106 key: TracedBytes,
107 new_val: TracedBytes,
108 old_val: Option<TracedBytes>,
109 ) -> Result<()>;
110 fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()>;
111}
112
113#[cfg_attr(test, automock)]
114#[async_trait::async_trait]
115pub trait ReplayStateStore {
116 async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize>;
117 async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64>;
118 async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
119 async fn try_wait_epoch(
120 &self,
121 epoch: HummockReadEpoch,
122 options: TracedTryWaitEpochOptions,
123 ) -> Result<()>;
124}
125
126#[cfg(test)]
129mock! {
130 pub GlobalReplayInterface{}
131 #[async_trait::async_trait]
132 impl ReplayRead for GlobalReplayInterface{
133 async fn iter(
134 &self,
135 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
136 epoch: u64,
137 read_options: TracedReadOptions,
138 ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
139 async fn get(
140 &self,
141 key: TracedBytes,
142 epoch: u64,
143 read_options: TracedReadOptions,
144 ) -> Result<Option<TracedBytes>>;
145 }
146 #[async_trait::async_trait]
147 impl ReplayStateStore for GlobalReplayInterface{
148 async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize>;
149 async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64,
150 ) -> Result<u64>;
151 async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
152 async fn try_wait_epoch(&self, epoch: HummockReadEpoch,options: TracedTryWaitEpochOptions) -> Result<()>;
153 }
154 impl GlobalReplay for GlobalReplayInterface{}
155}
156
157#[cfg(test)]
159mock! {
160 pub LocalReplayInterface{}
161 #[async_trait::async_trait]
162 impl LocalReplayRead for LocalReplayInterface{
163 async fn iter(
164 &self,
165 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
166 read_options: TracedReadOptions,
167 ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
168 async fn get(
169 &self,
170 key: TracedBytes,
171 read_options: TracedReadOptions,
172 ) -> Result<Option<TracedBytes>>;
173 }
174 #[async_trait::async_trait]
175 impl ReplayWrite for LocalReplayInterface{
176 fn insert(&mut self, key: TracedBytes, new_val: TracedBytes, old_val: Option<TracedBytes>) -> Result<()>;
177 fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()>;
178 }
179 #[async_trait::async_trait]
180 impl LocalReplay for LocalReplayInterface{
181 async fn init(&mut self, options: TracedInitOptions) -> Result<()>;
182 fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions);
183 async fn flush(&mut self) -> Result<usize>;
184 async fn try_flush(&mut self) -> Result<()>;
185 }
186}
187
188#[cfg(test)]
189pub(crate) struct MockReplayIterStream {
190 items: Vec<ReplayItem>,
191}
192#[cfg(test)]
193impl MockReplayIterStream {
194 pub(crate) fn new(items: Vec<ReplayItem>) -> Self {
195 Self { items }
196 }
197
198 #[try_stream(ok = ReplayItem, error = TraceError)]
199
200 pub(crate) async fn into_stream(self) {
201 for (key, value) in self.items {
202 yield (key, value)
203 }
204 }
205}