risingwave_hummock_trace/replay/
mod.rs1mod runner;
16mod worker;
17
18use std::ops::Bound;
19
20use futures::Stream;
21use futures::stream::BoxStream;
22#[cfg(test)]
23use futures_async_stream::try_stream;
24#[cfg(test)]
25use mockall::{automock, mock};
26use risingwave_hummock_sdk::HummockReadEpoch;
27use risingwave_pb::meta::subscribe_response::{Info, Operation as RespOperation};
28pub use runner::*;
29pub(crate) use worker::*;
30
31#[cfg(test)]
32use crate::TraceError;
33use crate::error::Result;
34use crate::{
35 LocalStorageId, Record, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
36 TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
37};
38
39pub type ReplayItem = (TracedBytes, TracedBytes);
40pub trait ReplayItemStream = Stream<Item = ReplayItem> + Send;
41
42type ReplayGroup = Record;
43
44#[derive(Debug)]
45enum WorkerResponse {
46 Continue,
47 Shutdown,
48}
49
50pub(crate) type ReplayRequest = Option<ReplayGroup>;
51
52#[derive(PartialEq, Eq, Hash, Debug, Clone)]
53pub(crate) enum WorkerId {
54 Local(u64, LocalStorageId),
56 OneShot(u64),
58}
59
60#[async_trait::async_trait]
61pub trait LocalReplay: LocalReplayRead + ReplayWrite + Send + Sync {
62 async fn init(&mut self, options: TracedInitOptions) -> Result<()>;
63 fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions);
64 fn is_dirty(&self) -> bool;
65 fn epoch(&self) -> u64;
66 async fn try_flush(&mut self) -> Result<()>;
67 async fn flush(&mut self) -> Result<usize>;
68}
69pub trait GlobalReplay: ReplayRead + ReplayStateStore + Send + Sync {}
70
71#[cfg_attr(test, automock)]
72#[async_trait::async_trait]
73pub trait LocalReplayRead {
74 async fn iter(
75 &self,
76 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
77 read_options: TracedReadOptions,
78 ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
79 async fn get(
80 &self,
81 key: TracedBytes,
82 read_options: TracedReadOptions,
83 ) -> Result<Option<TracedBytes>>;
84}
85
86#[cfg_attr(test, automock)]
87#[async_trait::async_trait]
88pub trait ReplayRead {
89 async fn iter(
90 &self,
91 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
92 epoch: u64,
93 read_options: TracedReadOptions,
94 ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
95 async fn get(
96 &self,
97 key: TracedBytes,
98 epoch: u64,
99 read_options: TracedReadOptions,
100 ) -> Result<Option<TracedBytes>>;
101}
102
103#[cfg_attr(test, automock)]
104#[async_trait::async_trait]
105pub trait ReplayWrite {
106 fn insert(
107 &mut self,
108 key: TracedBytes,
109 new_val: TracedBytes,
110 old_val: Option<TracedBytes>,
111 ) -> Result<()>;
112 fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()>;
113}
114
115#[cfg_attr(test, automock)]
116#[async_trait::async_trait]
117pub trait ReplayStateStore {
118 async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize>;
119 async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64>;
120 async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
121 async fn try_wait_epoch(
122 &self,
123 epoch: HummockReadEpoch,
124 options: TracedTryWaitEpochOptions,
125 ) -> Result<()>;
126}
127
128#[cfg(test)]
131mock! {
132 pub GlobalReplayInterface{}
133 #[async_trait::async_trait]
134 impl ReplayRead for GlobalReplayInterface{
135 async fn iter(
136 &self,
137 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
138 epoch: u64,
139 read_options: TracedReadOptions,
140 ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
141 async fn get(
142 &self,
143 key: TracedBytes,
144 epoch: u64,
145 read_options: TracedReadOptions,
146 ) -> Result<Option<TracedBytes>>;
147 }
148 #[async_trait::async_trait]
149 impl ReplayStateStore for GlobalReplayInterface{
150 async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize>;
151 async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64,
152 ) -> Result<u64>;
153 async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
154 async fn try_wait_epoch(&self, epoch: HummockReadEpoch,options: TracedTryWaitEpochOptions) -> Result<()>;
155 }
156 impl GlobalReplay for GlobalReplayInterface{}
157}
158
159#[cfg(test)]
161mock! {
162 pub LocalReplayInterface{}
163 #[async_trait::async_trait]
164 impl LocalReplayRead for LocalReplayInterface{
165 async fn iter(
166 &self,
167 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
168 read_options: TracedReadOptions,
169 ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
170 async fn get(
171 &self,
172 key: TracedBytes,
173 read_options: TracedReadOptions,
174 ) -> Result<Option<TracedBytes>>;
175 }
176 #[async_trait::async_trait]
177 impl ReplayWrite for LocalReplayInterface{
178 fn insert(&mut self, key: TracedBytes, new_val: TracedBytes, old_val: Option<TracedBytes>) -> Result<()>;
179 fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()>;
180 }
181 #[async_trait::async_trait]
182 impl LocalReplay for LocalReplayInterface{
183 async fn init(&mut self, options: TracedInitOptions) -> Result<()>;
184 fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions);
185 fn is_dirty(&self) -> bool;
186 fn epoch(&self) -> u64;
187 async fn flush(&mut self) -> Result<usize>;
188 async fn try_flush(&mut self) -> Result<()>;
189 }
190}
191
192#[cfg(test)]
193pub(crate) struct MockReplayIterStream {
194 items: Vec<ReplayItem>,
195}
196#[cfg(test)]
197impl MockReplayIterStream {
198 pub(crate) fn new(items: Vec<ReplayItem>) -> Self {
199 Self { items }
200 }
201
202 #[try_stream(ok = ReplayItem, error = TraceError)]
203
204 pub(crate) async fn into_stream(self) {
205 for (key, value) in self.items {
206 yield (key, value)
207 }
208 }
209}