1use std::ops::Bound;
16
17use bytes::Bytes;
18use futures::stream::BoxStream;
19use futures::{Stream, StreamExt, TryStreamExt};
20use futures_async_stream::try_stream;
21use risingwave_common::catalog::TableId;
22use risingwave_common::util::addr::HostAddr;
23use risingwave_common_service::{Channel, NotificationClient, ObserverError};
24use risingwave_hummock_sdk::key::TableKey;
25use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId, SyncResult};
26use risingwave_hummock_trace::{
27 GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore,
28 ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
29 TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, TracedTryWaitEpochOptions,
30};
31use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey};
32use risingwave_pb::common::WorkerNode;
33use risingwave_pb::meta::subscribe_response::{Info, Operation as RespOperation};
34use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
35use risingwave_storage::hummock::HummockStorage;
36use risingwave_storage::hummock::store::LocalHummockStorage;
37use risingwave_storage::hummock::test_utils::{StateStoreReadTestExt, StateStoreTestReadOptions};
38use risingwave_storage::store::*;
39use risingwave_storage::{StateStore, StateStoreIter, StateStoreReadIter};
40use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
41
42pub(crate) struct GlobalReplayIter<S>
43where
44 S: StateStoreReadIter,
45{
46 inner: S,
47}
48
49impl<S> GlobalReplayIter<S>
50where
51 S: StateStoreReadIter,
52{
53 pub(crate) fn new(inner: S) -> Self {
54 Self { inner }
55 }
56
57 pub(crate) fn into_stream(self) -> impl Stream<Item = Result<ReplayItem>> {
58 self.inner.into_stream(to_owned_item).map(|item_res| {
59 item_res
60 .map(|(key, value)| (key.user_key.table_key.0.into(), value.into()))
61 .map_err(|_| TraceError::IterFailed("iter failed to retrieve item".to_owned()))
62 })
63 }
64}
65
66pub(crate) struct LocalReplayIter {
67 inner: Vec<ReplayItem>,
68}
69
70impl LocalReplayIter {
71 pub(crate) async fn new(iter: impl StateStoreIter) -> Self {
72 let inner = iter
73 .into_stream(to_owned_item)
74 .map_ok(|value| (value.0.user_key.table_key.0.into(), value.1.into()))
75 .try_collect::<Vec<_>>()
76 .await
77 .unwrap();
78 Self { inner }
79 }
80
81 #[try_stream(ok = ReplayItem, error = TraceError)]
82 pub(crate) async fn into_stream(self) {
83 for (key, value) in self.inner {
84 yield (key, value)
85 }
86 }
87}
88
89pub(crate) struct GlobalReplayImpl {
90 store: HummockStorage,
91 notifier: NotificationManagerRef,
92}
93
94impl GlobalReplayImpl {
95 pub(crate) fn new(store: HummockStorage, notifier: NotificationManagerRef) -> Self {
96 Self { store, notifier }
97 }
98}
99
100impl GlobalReplay for GlobalReplayImpl {}
101
102fn convert_read_options(read_options: TracedReadOptions) -> StateStoreTestReadOptions {
103 StateStoreTestReadOptions {
104 table_id: read_options.table_id.table_id.into(),
105 prefix_hint: read_options.prefix_hint.map(Into::into),
106 prefetch_options: read_options.prefetch_options.into(),
107 cache_policy: read_options.cache_policy.into(),
108 read_committed: read_options.read_committed,
109 retention_seconds: read_options.retention_seconds,
110 read_version_from_backup: read_options.read_version_from_backup,
111 }
112}
113
114#[async_trait::async_trait]
115impl ReplayRead for GlobalReplayImpl {
116 async fn iter(
117 &self,
118 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
119 epoch: u64,
120 read_options: TracedReadOptions,
121 ) -> Result<BoxStream<'static, Result<ReplayItem>>> {
122 let key_range = (
123 key_range.0.map(TracedBytes::into).map(TableKey),
124 key_range.1.map(TracedBytes::into).map(TableKey),
125 );
126
127 let read_options = convert_read_options(read_options);
128
129 let iter = self
130 .store
131 .iter(key_range, epoch, read_options)
132 .await
133 .unwrap();
134 let stream = GlobalReplayIter::new(iter).into_stream().boxed();
135 Ok(stream)
136 }
137
138 async fn get(
139 &self,
140 key: TracedBytes,
141 epoch: u64,
142 read_options: TracedReadOptions,
143 ) -> Result<Option<TracedBytes>> {
144 let read_options = convert_read_options(read_options);
145 Ok(self
146 .store
147 .get(TableKey(key.into()), epoch, read_options)
148 .await
149 .unwrap()
150 .map(TracedBytes::from))
151 }
152}
153
154#[async_trait::async_trait]
155impl ReplayStateStore for GlobalReplayImpl {
156 async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize> {
157 let result: SyncResult = self
158 .store
159 .sync(
160 sync_table_epochs
161 .into_iter()
162 .map(|(epoch, table_ids)| {
163 (epoch, table_ids.into_iter().map(TableId::new).collect())
164 })
165 .collect(),
166 )
167 .await
168 .map_err(|e| TraceError::SyncFailed(format!("{e}")))?;
169 Ok(result.sync_size)
170 }
171
172 async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64> {
173 let prev_version_id = match &info {
174 Info::HummockVersionDeltas(deltas) => deltas.version_deltas.last().map(|d| d.prev_id),
175 _ => None,
176 };
177
178 self.notifier
179 .notify_hummock_with_version(op, info, Some(version));
180
181 if let Some(prev_version_id) = prev_version_id {
183 self.store
184 .wait_version_update(HummockVersionId::new(prev_version_id))
185 .await;
186 }
187 Ok(version)
188 }
189
190 async fn new_local(&self, options: TracedNewLocalOptions) -> Box<dyn LocalReplay> {
191 let local_storage = self.store.new_local(options.into()).await;
192 Box::new(LocalReplayImpl(local_storage))
193 }
194
195 async fn try_wait_epoch(
196 &self,
197 epoch: HummockReadEpoch,
198 options: TracedTryWaitEpochOptions,
199 ) -> Result<()> {
200 self.store
201 .try_wait_epoch(epoch, options.into())
202 .await
203 .map_err(|_| TraceError::TryWaitEpochFailed)?;
204 Ok(())
205 }
206}
207pub(crate) struct LocalReplayImpl(LocalHummockStorage);
208
209#[async_trait::async_trait]
210impl LocalReplay for LocalReplayImpl {
211 async fn init(&mut self, options: TracedInitOptions) -> Result<()> {
212 self.0
213 .init(options.into())
214 .await
215 .map_err(|_| TraceError::Other("init failed"))
216 }
217
218 fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions) {
219 self.0.seal_current_epoch(next_epoch, opts.into());
220 }
221
222 async fn flush(&mut self) -> Result<usize> {
223 self.0.flush().await.map_err(|_| TraceError::FlushFailed)
224 }
225
226 async fn try_flush(&mut self) -> Result<()> {
227 self.0
228 .try_flush()
229 .await
230 .map_err(|_| TraceError::TryFlushFailed)
231 }
232}
233
234#[async_trait::async_trait]
235impl LocalReplayRead for LocalReplayImpl {
236 async fn iter(
237 &self,
238 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
239 read_options: TracedReadOptions,
240 ) -> Result<BoxStream<'static, Result<ReplayItem>>> {
241 let key_range = (
242 key_range.0.map(|b| TableKey(b.into())),
243 key_range.1.map(|b| TableKey(b.into())),
244 );
245
246 let iter = LocalStateStore::iter(&self.0, key_range, read_options.into())
247 .await
248 .unwrap();
249
250 let stream = LocalReplayIter::new(iter).await.into_stream().boxed();
251 Ok(stream)
252 }
253
254 async fn get(
255 &self,
256 key: TracedBytes,
257 read_options: TracedReadOptions,
258 ) -> Result<Option<TracedBytes>> {
259 Ok(self
260 .0
261 .on_key_value(TableKey(key.into()), read_options.into(), |_, value| {
262 Ok(TracedBytes::from(Bytes::copy_from_slice(value)))
263 })
264 .await
265 .unwrap())
266 }
267}
268
269#[async_trait::async_trait]
270impl ReplayWrite for LocalReplayImpl {
271 fn insert(
272 &mut self,
273 key: TracedBytes,
274 new_val: TracedBytes,
275 old_val: Option<TracedBytes>,
276 ) -> Result<()> {
277 LocalStateStore::insert(
278 &mut self.0,
279 TableKey(key.into()),
280 new_val.into(),
281 old_val.map(|b| b.into()),
282 )
283 .unwrap();
284 Ok(())
285 }
286
287 fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()> {
288 LocalStateStore::delete(&mut self.0, TableKey(key.into()), old_val.into()).unwrap();
289 Ok(())
290 }
291}
292
293pub struct ReplayNotificationClient {
294 addr: HostAddr,
295 notification_manager: NotificationManagerRef,
296 first_resp: Box<TracedSubResp>,
297}
298
299impl ReplayNotificationClient {
300 pub fn new(
301 addr: HostAddr,
302 notification_manager: NotificationManagerRef,
303 first_resp: Box<TracedSubResp>,
304 ) -> Self {
305 Self {
306 addr,
307 notification_manager,
308 first_resp,
309 }
310 }
311}
312
313#[async_trait::async_trait]
314impl NotificationClient for ReplayNotificationClient {
315 type Channel = ReplayChannel<SubscribeResponse>;
316
317 async fn subscribe(
318 &self,
319 subscribe_type: SubscribeType,
320 ) -> std::result::Result<Self::Channel, ObserverError> {
321 let (tx, rx) = unbounded_channel();
322
323 self.notification_manager
324 .insert_sender(subscribe_type, WorkerKey(self.addr.to_protobuf()), tx)
325 .await;
326
327 let op = self.first_resp.0.operation();
329 let info = self.first_resp.0.info.clone();
330
331 self.notification_manager
332 .notify_hummock(op, info.unwrap())
333 .await;
334
335 Ok(ReplayChannel(rx))
336 }
337}
338
339pub fn get_replay_notification_client(
340 env: MetaSrvEnv,
341 worker_node: WorkerNode,
342 first_resp: Box<TracedSubResp>,
343) -> ReplayNotificationClient {
344 ReplayNotificationClient::new(
345 worker_node.get_host().unwrap().into(),
346 env.notification_manager_ref(),
347 first_resp,
348 )
349}
350
351pub struct ReplayChannel<T>(UnboundedReceiver<std::result::Result<T, MessageStatus>>);
352
353#[async_trait::async_trait]
354impl<T: Send + 'static> Channel for ReplayChannel<T> {
355 type Item = T;
356
357 async fn message(&mut self) -> std::result::Result<Option<T>, MessageStatus> {
358 match self.0.recv().await {
359 None => Ok(None),
360 Some(result) => result.map(|r| Some(r)),
361 }
362 }
363}