1use std::ops::Bound;
16
17use futures::stream::BoxStream;
18use futures::{Stream, StreamExt, TryStreamExt};
19use futures_async_stream::try_stream;
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::addr::HostAddr;
22use risingwave_common_service::{Channel, NotificationClient, ObserverError};
23use risingwave_hummock_sdk::key::TableKey;
24use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId, SyncResult};
25use risingwave_hummock_trace::{
26 GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore,
27 ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
28 TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, TracedTryWaitEpochOptions,
29};
30use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey};
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::meta::subscribe_response::{Info, Operation as RespOperation};
33use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
34use risingwave_storage::hummock::HummockStorage;
35use risingwave_storage::hummock::store::LocalHummockStorage;
36use risingwave_storage::hummock::test_utils::*;
37use risingwave_storage::store::{LocalStateStore, StateStoreIterExt, to_owned_item};
38use risingwave_storage::{StateStore, StateStoreIter, StateStoreReadIter};
39use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
40
41pub(crate) struct GlobalReplayIter<S>
42where
43 S: StateStoreReadIter,
44{
45 inner: S,
46}
47
48impl<S> GlobalReplayIter<S>
49where
50 S: StateStoreReadIter,
51{
52 pub(crate) fn new(inner: S) -> Self {
53 Self { inner }
54 }
55
56 pub(crate) fn into_stream(self) -> impl Stream<Item = Result<ReplayItem>> {
57 self.inner.into_stream(to_owned_item).map(|item_res| {
58 item_res
59 .map(|(key, value)| (key.user_key.table_key.0.into(), value.into()))
60 .map_err(|_| TraceError::IterFailed("iter failed to retrieve item".to_owned()))
61 })
62 }
63}
64
65pub(crate) struct LocalReplayIter {
66 inner: Vec<ReplayItem>,
67}
68
69impl LocalReplayIter {
70 pub(crate) async fn new(iter: impl StateStoreIter) -> Self {
71 let inner = iter
72 .into_stream(to_owned_item)
73 .map_ok(|value| (value.0.user_key.table_key.0.into(), value.1.into()))
74 .try_collect::<Vec<_>>()
75 .await
76 .unwrap();
77 Self { inner }
78 }
79
80 #[try_stream(ok = ReplayItem, error = TraceError)]
81 pub(crate) async fn into_stream(self) {
82 for (key, value) in self.inner {
83 yield (key, value)
84 }
85 }
86}
87
88pub(crate) struct GlobalReplayImpl {
89 store: HummockStorage,
90 notifier: NotificationManagerRef,
91}
92
93impl GlobalReplayImpl {
94 pub(crate) fn new(store: HummockStorage, notifier: NotificationManagerRef) -> Self {
95 Self { store, notifier }
96 }
97}
98
99impl GlobalReplay for GlobalReplayImpl {}
100
101#[async_trait::async_trait]
102impl ReplayRead for GlobalReplayImpl {
103 async fn iter(
104 &self,
105 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
106 epoch: u64,
107 read_options: TracedReadOptions,
108 ) -> Result<BoxStream<'static, Result<ReplayItem>>> {
109 let key_range = (
110 key_range.0.map(TracedBytes::into).map(TableKey),
111 key_range.1.map(TracedBytes::into).map(TableKey),
112 );
113
114 let iter = self
115 .store
116 .iter(key_range, epoch, read_options.into())
117 .await
118 .unwrap();
119 let stream = GlobalReplayIter::new(iter).into_stream().boxed();
120 Ok(stream)
121 }
122
123 async fn get(
124 &self,
125 key: TracedBytes,
126 epoch: u64,
127 read_options: TracedReadOptions,
128 ) -> Result<Option<TracedBytes>> {
129 Ok(self
130 .store
131 .get(TableKey(key.into()), epoch, read_options.into())
132 .await
133 .unwrap()
134 .map(TracedBytes::from))
135 }
136}
137
138#[async_trait::async_trait]
139impl ReplayStateStore for GlobalReplayImpl {
140 async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize> {
141 let result: SyncResult = self
142 .store
143 .sync(
144 sync_table_epochs
145 .into_iter()
146 .map(|(epoch, table_ids)| {
147 (epoch, table_ids.into_iter().map(TableId::new).collect())
148 })
149 .collect(),
150 )
151 .await
152 .map_err(|e| TraceError::SyncFailed(format!("{e}")))?;
153 Ok(result.sync_size)
154 }
155
156 async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64> {
157 let prev_version_id = match &info {
158 Info::HummockVersionDeltas(deltas) => deltas.version_deltas.last().map(|d| d.prev_id),
159 _ => None,
160 };
161
162 self.notifier
163 .notify_hummock_with_version(op, info, Some(version));
164
165 if let Some(prev_version_id) = prev_version_id {
167 self.store
168 .wait_version_update(HummockVersionId::new(prev_version_id))
169 .await;
170 }
171 Ok(version)
172 }
173
174 async fn new_local(&self, options: TracedNewLocalOptions) -> Box<dyn LocalReplay> {
175 let local_storage = self.store.new_local(options.into()).await;
176 Box::new(LocalReplayImpl(local_storage))
177 }
178
179 async fn try_wait_epoch(
180 &self,
181 epoch: HummockReadEpoch,
182 options: TracedTryWaitEpochOptions,
183 ) -> Result<()> {
184 self.store
185 .try_wait_epoch(epoch, options.into())
186 .await
187 .map_err(|_| TraceError::TryWaitEpochFailed)?;
188 Ok(())
189 }
190}
191pub(crate) struct LocalReplayImpl(LocalHummockStorage);
192
193#[async_trait::async_trait]
194impl LocalReplay for LocalReplayImpl {
195 async fn init(&mut self, options: TracedInitOptions) -> Result<()> {
196 self.0
197 .init(options.into())
198 .await
199 .map_err(|_| TraceError::Other("init failed"))
200 }
201
202 fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions) {
203 self.0.seal_current_epoch(next_epoch, opts.into());
204 }
205
206 fn epoch(&self) -> u64 {
207 self.0.epoch()
208 }
209
210 async fn flush(&mut self) -> Result<usize> {
211 self.0.flush().await.map_err(|_| TraceError::FlushFailed)
212 }
213
214 fn is_dirty(&self) -> bool {
215 self.0.is_dirty()
216 }
217
218 async fn try_flush(&mut self) -> Result<()> {
219 self.0
220 .try_flush()
221 .await
222 .map_err(|_| TraceError::TryFlushFailed)
223 }
224}
225
226#[async_trait::async_trait]
227impl LocalReplayRead for LocalReplayImpl {
228 async fn iter(
229 &self,
230 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
231 read_options: TracedReadOptions,
232 ) -> Result<BoxStream<'static, Result<ReplayItem>>> {
233 let key_range = (
234 key_range.0.map(|b| TableKey(b.into())),
235 key_range.1.map(|b| TableKey(b.into())),
236 );
237
238 let iter = LocalStateStore::iter(&self.0, key_range, read_options.into())
239 .await
240 .unwrap();
241
242 let stream = LocalReplayIter::new(iter).await.into_stream().boxed();
243 Ok(stream)
244 }
245
246 async fn get(
247 &self,
248 key: TracedBytes,
249 read_options: TracedReadOptions,
250 ) -> Result<Option<TracedBytes>> {
251 Ok(
252 LocalStateStore::get(&self.0, TableKey(key.into()), read_options.into())
253 .await
254 .unwrap()
255 .map(TracedBytes::from),
256 )
257 }
258}
259
260#[async_trait::async_trait]
261impl ReplayWrite for LocalReplayImpl {
262 fn insert(
263 &mut self,
264 key: TracedBytes,
265 new_val: TracedBytes,
266 old_val: Option<TracedBytes>,
267 ) -> Result<()> {
268 LocalStateStore::insert(
269 &mut self.0,
270 TableKey(key.into()),
271 new_val.into(),
272 old_val.map(|b| b.into()),
273 )
274 .unwrap();
275 Ok(())
276 }
277
278 fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()> {
279 LocalStateStore::delete(&mut self.0, TableKey(key.into()), old_val.into()).unwrap();
280 Ok(())
281 }
282}
283
284pub struct ReplayNotificationClient {
285 addr: HostAddr,
286 notification_manager: NotificationManagerRef,
287 first_resp: Box<TracedSubResp>,
288}
289
290impl ReplayNotificationClient {
291 pub fn new(
292 addr: HostAddr,
293 notification_manager: NotificationManagerRef,
294 first_resp: Box<TracedSubResp>,
295 ) -> Self {
296 Self {
297 addr,
298 notification_manager,
299 first_resp,
300 }
301 }
302}
303
304#[async_trait::async_trait]
305impl NotificationClient for ReplayNotificationClient {
306 type Channel = ReplayChannel<SubscribeResponse>;
307
308 async fn subscribe(
309 &self,
310 subscribe_type: SubscribeType,
311 ) -> std::result::Result<Self::Channel, ObserverError> {
312 let (tx, rx) = unbounded_channel();
313
314 self.notification_manager
315 .insert_sender(subscribe_type, WorkerKey(self.addr.to_protobuf()), tx)
316 .await;
317
318 let op = self.first_resp.0.operation();
320 let info = self.first_resp.0.info.clone();
321
322 self.notification_manager
323 .notify_hummock(op, info.unwrap())
324 .await;
325
326 Ok(ReplayChannel(rx))
327 }
328}
329
330pub fn get_replay_notification_client(
331 env: MetaSrvEnv,
332 worker_node: WorkerNode,
333 first_resp: Box<TracedSubResp>,
334) -> ReplayNotificationClient {
335 ReplayNotificationClient::new(
336 worker_node.get_host().unwrap().into(),
337 env.notification_manager_ref(),
338 first_resp,
339 )
340}
341
342pub struct ReplayChannel<T>(UnboundedReceiver<std::result::Result<T, MessageStatus>>);
343
344#[async_trait::async_trait]
345impl<T: Send + 'static> Channel for ReplayChannel<T> {
346 type Item = T;
347
348 async fn message(&mut self) -> std::result::Result<Option<T>, MessageStatus> {
349 match self.0.recv().await {
350 None => Ok(None),
351 Some(result) => result.map(|r| Some(r)),
352 }
353 }
354}