1use std::ops::Bound;
16
17use bytes::Bytes;
18use futures::stream::BoxStream;
19use futures::{Stream, StreamExt, TryStreamExt};
20use futures_async_stream::try_stream;
21use risingwave_common::catalog::TableId;
22use risingwave_common::util::addr::HostAddr;
23use risingwave_common_service::{Channel, NotificationClient, ObserverError};
24use risingwave_hummock_sdk::key::TableKey;
25use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult};
26use risingwave_hummock_trace::{
27 GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore,
28 ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
29 TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, TracedTryWaitEpochOptions,
30};
31use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey};
32use risingwave_pb::common::WorkerNode;
33use risingwave_pb::meta::subscribe_response::{Info, Operation as RespOperation};
34use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
35use risingwave_storage::hummock::HummockStorage;
36use risingwave_storage::hummock::store::LocalHummockStorage;
37use risingwave_storage::hummock::test_utils::{StateStoreReadTestExt, StateStoreTestReadOptions};
38use risingwave_storage::store::*;
39use risingwave_storage::{StateStore, StateStoreIter, StateStoreReadIter};
40use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
41
42pub(crate) struct GlobalReplayIter<S>
43where
44 S: StateStoreReadIter,
45{
46 inner: S,
47}
48
49impl<S> GlobalReplayIter<S>
50where
51 S: StateStoreReadIter,
52{
53 pub(crate) fn new(inner: S) -> Self {
54 Self { inner }
55 }
56
57 pub(crate) fn into_stream(self) -> impl Stream<Item = Result<ReplayItem>> {
58 self.inner.into_stream(to_owned_item).map(|item_res| {
59 item_res
60 .map(|(key, value)| (key.user_key.table_key.0.into(), value.into()))
61 .map_err(|_| TraceError::IterFailed("iter failed to retrieve item".to_owned()))
62 })
63 }
64}
65
66pub(crate) struct LocalReplayIter {
67 inner: Vec<ReplayItem>,
68}
69
70impl LocalReplayIter {
71 pub(crate) async fn new(iter: impl StateStoreIter) -> Self {
72 let inner = iter
73 .into_stream(to_owned_item)
74 .map_ok(|value| (value.0.user_key.table_key.0.into(), value.1.into()))
75 .try_collect::<Vec<_>>()
76 .await
77 .unwrap();
78 Self { inner }
79 }
80
81 #[try_stream(ok = ReplayItem, error = TraceError)]
82 pub(crate) async fn into_stream(self) {
83 for (key, value) in self.inner {
84 yield (key, value)
85 }
86 }
87}
88
89pub(crate) struct GlobalReplayImpl {
90 store: HummockStorage,
91 notifier: NotificationManagerRef,
92}
93
94impl GlobalReplayImpl {
95 pub(crate) fn new(store: HummockStorage, notifier: NotificationManagerRef) -> Self {
96 Self { store, notifier }
97 }
98}
99
100impl GlobalReplay for GlobalReplayImpl {}
101
102fn convert_read_options(read_options: TracedReadOptions) -> StateStoreTestReadOptions {
103 StateStoreTestReadOptions {
104 table_id: read_options.table_id.table_id.into(),
105 prefix_hint: read_options.prefix_hint.map(Into::into),
106 prefetch_options: read_options.prefetch_options.into(),
107 cache_policy: read_options.cache_policy.into(),
108 read_committed: read_options.read_committed,
109 retention_seconds: read_options.retention_seconds,
110 read_version_from_backup: read_options.read_version_from_backup,
111 }
112}
113
114#[async_trait::async_trait]
115impl ReplayRead for GlobalReplayImpl {
116 async fn iter(
117 &self,
118 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
119 epoch: u64,
120 read_options: TracedReadOptions,
121 ) -> Result<BoxStream<'static, Result<ReplayItem>>> {
122 let key_range = (
123 key_range.0.map(TracedBytes::into).map(TableKey),
124 key_range.1.map(TracedBytes::into).map(TableKey),
125 );
126
127 let read_options = convert_read_options(read_options);
128
129 let iter = self
130 .store
131 .iter(key_range, epoch, read_options)
132 .await
133 .unwrap();
134 let stream = GlobalReplayIter::new(iter).into_stream().boxed();
135 Ok(stream)
136 }
137
138 async fn get(
139 &self,
140 key: TracedBytes,
141 epoch: u64,
142 read_options: TracedReadOptions,
143 ) -> Result<Option<TracedBytes>> {
144 let read_options = convert_read_options(read_options);
145 Ok(self
146 .store
147 .get(TableKey(key.into()), epoch, read_options)
148 .await
149 .unwrap()
150 .map(TracedBytes::from))
151 }
152}
153
154#[async_trait::async_trait]
155impl ReplayStateStore for GlobalReplayImpl {
156 async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize> {
157 let result: SyncResult = self
158 .store
159 .sync(
160 sync_table_epochs
161 .into_iter()
162 .map(|(epoch, table_ids)| {
163 (epoch, table_ids.into_iter().map(TableId::new).collect())
164 })
165 .collect(),
166 )
167 .await
168 .map_err(|e| TraceError::SyncFailed(format!("{e}")))?;
169 Ok(result.sync_size)
170 }
171
172 async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64> {
173 let prev_version_id = match &info {
174 Info::HummockVersionDeltas(deltas) => deltas.version_deltas.last().map(|d| d.prev_id),
175 _ => None,
176 };
177
178 self.notifier
179 .notify_hummock_with_version(op, info, Some(version));
180
181 if let Some(prev_version_id) = prev_version_id {
183 self.store.wait_version_update(prev_version_id).await;
184 }
185 Ok(version)
186 }
187
188 async fn new_local(&self, options: TracedNewLocalOptions) -> Box<dyn LocalReplay> {
189 let local_storage = self.store.new_local(options.into()).await;
190 Box::new(LocalReplayImpl(local_storage))
191 }
192
193 async fn try_wait_epoch(
194 &self,
195 epoch: HummockReadEpoch,
196 options: TracedTryWaitEpochOptions,
197 ) -> Result<()> {
198 self.store
199 .try_wait_epoch(epoch, options.into())
200 .await
201 .map_err(|_| TraceError::TryWaitEpochFailed)?;
202 Ok(())
203 }
204}
205pub(crate) struct LocalReplayImpl(LocalHummockStorage);
206
207#[async_trait::async_trait]
208impl LocalReplay for LocalReplayImpl {
209 async fn init(&mut self, options: TracedInitOptions) -> Result<()> {
210 self.0
211 .init(options.into())
212 .await
213 .map_err(|_| TraceError::Other("init failed"))
214 }
215
216 fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions) {
217 self.0.seal_current_epoch(next_epoch, opts.into());
218 }
219
220 async fn flush(&mut self) -> Result<usize> {
221 self.0.flush().await.map_err(|_| TraceError::FlushFailed)
222 }
223
224 async fn try_flush(&mut self) -> Result<()> {
225 self.0
226 .try_flush()
227 .await
228 .map_err(|_| TraceError::TryFlushFailed)
229 }
230}
231
232#[async_trait::async_trait]
233impl LocalReplayRead for LocalReplayImpl {
234 async fn iter(
235 &self,
236 key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
237 read_options: TracedReadOptions,
238 ) -> Result<BoxStream<'static, Result<ReplayItem>>> {
239 let key_range = (
240 key_range.0.map(|b| TableKey(b.into())),
241 key_range.1.map(|b| TableKey(b.into())),
242 );
243
244 let iter = LocalStateStore::iter(&self.0, key_range, read_options.into())
245 .await
246 .unwrap();
247
248 let stream = LocalReplayIter::new(iter).await.into_stream().boxed();
249 Ok(stream)
250 }
251
252 async fn get(
253 &self,
254 key: TracedBytes,
255 read_options: TracedReadOptions,
256 ) -> Result<Option<TracedBytes>> {
257 Ok(self
258 .0
259 .on_key_value(TableKey(key.into()), read_options.into(), |_, value| {
260 Ok(TracedBytes::from(Bytes::copy_from_slice(value)))
261 })
262 .await
263 .unwrap())
264 }
265}
266
267#[async_trait::async_trait]
268impl ReplayWrite for LocalReplayImpl {
269 fn insert(
270 &mut self,
271 key: TracedBytes,
272 new_val: TracedBytes,
273 old_val: Option<TracedBytes>,
274 ) -> Result<()> {
275 LocalStateStore::insert(
276 &mut self.0,
277 TableKey(key.into()),
278 new_val.into(),
279 old_val.map(|b| b.into()),
280 )
281 .unwrap();
282 Ok(())
283 }
284
285 fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()> {
286 LocalStateStore::delete(&mut self.0, TableKey(key.into()), old_val.into()).unwrap();
287 Ok(())
288 }
289}
290
291pub struct ReplayNotificationClient {
292 addr: HostAddr,
293 notification_manager: NotificationManagerRef,
294 first_resp: Box<TracedSubResp>,
295}
296
297impl ReplayNotificationClient {
298 pub fn new(
299 addr: HostAddr,
300 notification_manager: NotificationManagerRef,
301 first_resp: Box<TracedSubResp>,
302 ) -> Self {
303 Self {
304 addr,
305 notification_manager,
306 first_resp,
307 }
308 }
309}
310
311#[async_trait::async_trait]
312impl NotificationClient for ReplayNotificationClient {
313 type Channel = ReplayChannel<SubscribeResponse>;
314
315 async fn subscribe(
316 &self,
317 subscribe_type: SubscribeType,
318 ) -> std::result::Result<Self::Channel, ObserverError> {
319 let (tx, rx) = unbounded_channel();
320
321 self.notification_manager.insert_sender(
322 subscribe_type,
323 WorkerKey(self.addr.to_protobuf()),
324 tx,
325 );
326
327 let op = self.first_resp.0.operation();
329 let info = self.first_resp.0.info.clone();
330
331 self.notification_manager
332 .notify_hummock(op, info.unwrap())
333 .await;
334
335 Ok(ReplayChannel(rx))
336 }
337}
338
339pub fn get_replay_notification_client(
340 env: MetaSrvEnv,
341 worker_node: WorkerNode,
342 first_resp: Box<TracedSubResp>,
343) -> ReplayNotificationClient {
344 ReplayNotificationClient::new(
345 worker_node.get_host().unwrap().into(),
346 env.notification_manager_ref(),
347 first_resp,
348 )
349}
350
351pub struct ReplayChannel<T>(UnboundedReceiver<std::result::Result<T, MessageStatus>>);
352
353#[async_trait::async_trait]
354impl<T: Send + 'static> Channel for ReplayChannel<T> {
355 type Item = T;
356
357 async fn message(&mut self) -> std::result::Result<Option<T>, MessageStatus> {
358 match self.0.recv().await {
359 None => Ok(None),
360 Some(result) => result.map(|r| Some(r)),
361 }
362 }
363}