replay/
replay_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use bytes::Bytes;
18use futures::stream::BoxStream;
19use futures::{Stream, StreamExt, TryStreamExt};
20use futures_async_stream::try_stream;
21use risingwave_common::catalog::TableId;
22use risingwave_common::util::addr::HostAddr;
23use risingwave_common_service::{Channel, NotificationClient, ObserverError};
24use risingwave_hummock_sdk::key::TableKey;
25use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId, SyncResult};
26use risingwave_hummock_trace::{
27    GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore,
28    ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
29    TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, TracedTryWaitEpochOptions,
30};
31use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey};
32use risingwave_pb::common::WorkerNode;
33use risingwave_pb::meta::subscribe_response::{Info, Operation as RespOperation};
34use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
35use risingwave_storage::hummock::HummockStorage;
36use risingwave_storage::hummock::store::LocalHummockStorage;
37use risingwave_storage::hummock::test_utils::{StateStoreReadTestExt, StateStoreTestReadOptions};
38use risingwave_storage::store::*;
39use risingwave_storage::{StateStore, StateStoreIter, StateStoreReadIter};
40use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
41
42pub(crate) struct GlobalReplayIter<S>
43where
44    S: StateStoreReadIter,
45{
46    inner: S,
47}
48
49impl<S> GlobalReplayIter<S>
50where
51    S: StateStoreReadIter,
52{
53    pub(crate) fn new(inner: S) -> Self {
54        Self { inner }
55    }
56
57    pub(crate) fn into_stream(self) -> impl Stream<Item = Result<ReplayItem>> {
58        self.inner.into_stream(to_owned_item).map(|item_res| {
59            item_res
60                .map(|(key, value)| (key.user_key.table_key.0.into(), value.into()))
61                .map_err(|_| TraceError::IterFailed("iter failed to retrieve item".to_owned()))
62        })
63    }
64}
65
66pub(crate) struct LocalReplayIter {
67    inner: Vec<ReplayItem>,
68}
69
70impl LocalReplayIter {
71    pub(crate) async fn new(iter: impl StateStoreIter) -> Self {
72        let inner = iter
73            .into_stream(to_owned_item)
74            .map_ok(|value| (value.0.user_key.table_key.0.into(), value.1.into()))
75            .try_collect::<Vec<_>>()
76            .await
77            .unwrap();
78        Self { inner }
79    }
80
81    #[try_stream(ok = ReplayItem, error = TraceError)]
82    pub(crate) async fn into_stream(self) {
83        for (key, value) in self.inner {
84            yield (key, value)
85        }
86    }
87}
88
89pub(crate) struct GlobalReplayImpl {
90    store: HummockStorage,
91    notifier: NotificationManagerRef,
92}
93
94impl GlobalReplayImpl {
95    pub(crate) fn new(store: HummockStorage, notifier: NotificationManagerRef) -> Self {
96        Self { store, notifier }
97    }
98}
99
100impl GlobalReplay for GlobalReplayImpl {}
101
102fn convert_read_options(read_options: TracedReadOptions) -> StateStoreTestReadOptions {
103    StateStoreTestReadOptions {
104        table_id: read_options.table_id.table_id.into(),
105        prefix_hint: read_options.prefix_hint.map(Into::into),
106        prefetch_options: read_options.prefetch_options.into(),
107        cache_policy: read_options.cache_policy.into(),
108        read_committed: read_options.read_committed,
109        retention_seconds: read_options.retention_seconds,
110        read_version_from_backup: read_options.read_version_from_backup,
111    }
112}
113
114#[async_trait::async_trait]
115impl ReplayRead for GlobalReplayImpl {
116    async fn iter(
117        &self,
118        key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
119        epoch: u64,
120        read_options: TracedReadOptions,
121    ) -> Result<BoxStream<'static, Result<ReplayItem>>> {
122        let key_range = (
123            key_range.0.map(TracedBytes::into).map(TableKey),
124            key_range.1.map(TracedBytes::into).map(TableKey),
125        );
126
127        let read_options = convert_read_options(read_options);
128
129        let iter = self
130            .store
131            .iter(key_range, epoch, read_options)
132            .await
133            .unwrap();
134        let stream = GlobalReplayIter::new(iter).into_stream().boxed();
135        Ok(stream)
136    }
137
138    async fn get(
139        &self,
140        key: TracedBytes,
141        epoch: u64,
142        read_options: TracedReadOptions,
143    ) -> Result<Option<TracedBytes>> {
144        let read_options = convert_read_options(read_options);
145        Ok(self
146            .store
147            .get(TableKey(key.into()), epoch, read_options)
148            .await
149            .unwrap()
150            .map(TracedBytes::from))
151    }
152}
153
154#[async_trait::async_trait]
155impl ReplayStateStore for GlobalReplayImpl {
156    async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize> {
157        let result: SyncResult = self
158            .store
159            .sync(
160                sync_table_epochs
161                    .into_iter()
162                    .map(|(epoch, table_ids)| {
163                        (epoch, table_ids.into_iter().map(TableId::new).collect())
164                    })
165                    .collect(),
166            )
167            .await
168            .map_err(|e| TraceError::SyncFailed(format!("{e}")))?;
169        Ok(result.sync_size)
170    }
171
172    async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64> {
173        let prev_version_id = match &info {
174            Info::HummockVersionDeltas(deltas) => deltas.version_deltas.last().map(|d| d.prev_id),
175            _ => None,
176        };
177
178        self.notifier
179            .notify_hummock_with_version(op, info, Some(version));
180
181        // wait till version updated
182        if let Some(prev_version_id) = prev_version_id {
183            self.store
184                .wait_version_update(HummockVersionId::new(prev_version_id))
185                .await;
186        }
187        Ok(version)
188    }
189
190    async fn new_local(&self, options: TracedNewLocalOptions) -> Box<dyn LocalReplay> {
191        let local_storage = self.store.new_local(options.into()).await;
192        Box::new(LocalReplayImpl(local_storage))
193    }
194
195    async fn try_wait_epoch(
196        &self,
197        epoch: HummockReadEpoch,
198        options: TracedTryWaitEpochOptions,
199    ) -> Result<()> {
200        self.store
201            .try_wait_epoch(epoch, options.into())
202            .await
203            .map_err(|_| TraceError::TryWaitEpochFailed)?;
204        Ok(())
205    }
206}
207pub(crate) struct LocalReplayImpl(LocalHummockStorage);
208
209#[async_trait::async_trait]
210impl LocalReplay for LocalReplayImpl {
211    async fn init(&mut self, options: TracedInitOptions) -> Result<()> {
212        self.0
213            .init(options.into())
214            .await
215            .map_err(|_| TraceError::Other("init failed"))
216    }
217
218    fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions) {
219        self.0.seal_current_epoch(next_epoch, opts.into());
220    }
221
222    async fn flush(&mut self) -> Result<usize> {
223        self.0.flush().await.map_err(|_| TraceError::FlushFailed)
224    }
225
226    async fn try_flush(&mut self) -> Result<()> {
227        self.0
228            .try_flush()
229            .await
230            .map_err(|_| TraceError::TryFlushFailed)
231    }
232}
233
234#[async_trait::async_trait]
235impl LocalReplayRead for LocalReplayImpl {
236    async fn iter(
237        &self,
238        key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
239        read_options: TracedReadOptions,
240    ) -> Result<BoxStream<'static, Result<ReplayItem>>> {
241        let key_range = (
242            key_range.0.map(|b| TableKey(b.into())),
243            key_range.1.map(|b| TableKey(b.into())),
244        );
245
246        let iter = LocalStateStore::iter(&self.0, key_range, read_options.into())
247            .await
248            .unwrap();
249
250        let stream = LocalReplayIter::new(iter).await.into_stream().boxed();
251        Ok(stream)
252    }
253
254    async fn get(
255        &self,
256        key: TracedBytes,
257        read_options: TracedReadOptions,
258    ) -> Result<Option<TracedBytes>> {
259        Ok(self
260            .0
261            .on_key_value(TableKey(key.into()), read_options.into(), |_, value| {
262                Ok(TracedBytes::from(Bytes::copy_from_slice(value)))
263            })
264            .await
265            .unwrap())
266    }
267}
268
269#[async_trait::async_trait]
270impl ReplayWrite for LocalReplayImpl {
271    fn insert(
272        &mut self,
273        key: TracedBytes,
274        new_val: TracedBytes,
275        old_val: Option<TracedBytes>,
276    ) -> Result<()> {
277        LocalStateStore::insert(
278            &mut self.0,
279            TableKey(key.into()),
280            new_val.into(),
281            old_val.map(|b| b.into()),
282        )
283        .unwrap();
284        Ok(())
285    }
286
287    fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()> {
288        LocalStateStore::delete(&mut self.0, TableKey(key.into()), old_val.into()).unwrap();
289        Ok(())
290    }
291}
292
293pub struct ReplayNotificationClient {
294    addr: HostAddr,
295    notification_manager: NotificationManagerRef,
296    first_resp: Box<TracedSubResp>,
297}
298
299impl ReplayNotificationClient {
300    pub fn new(
301        addr: HostAddr,
302        notification_manager: NotificationManagerRef,
303        first_resp: Box<TracedSubResp>,
304    ) -> Self {
305        Self {
306            addr,
307            notification_manager,
308            first_resp,
309        }
310    }
311}
312
313#[async_trait::async_trait]
314impl NotificationClient for ReplayNotificationClient {
315    type Channel = ReplayChannel<SubscribeResponse>;
316
317    async fn subscribe(
318        &self,
319        subscribe_type: SubscribeType,
320    ) -> std::result::Result<Self::Channel, ObserverError> {
321        let (tx, rx) = unbounded_channel();
322
323        self.notification_manager
324            .insert_sender(subscribe_type, WorkerKey(self.addr.to_protobuf()), tx)
325            .await;
326
327        // send the first snapshot message
328        let op = self.first_resp.0.operation();
329        let info = self.first_resp.0.info.clone();
330
331        self.notification_manager
332            .notify_hummock(op, info.unwrap())
333            .await;
334
335        Ok(ReplayChannel(rx))
336    }
337}
338
339pub fn get_replay_notification_client(
340    env: MetaSrvEnv,
341    worker_node: WorkerNode,
342    first_resp: Box<TracedSubResp>,
343) -> ReplayNotificationClient {
344    ReplayNotificationClient::new(
345        worker_node.get_host().unwrap().into(),
346        env.notification_manager_ref(),
347        first_resp,
348    )
349}
350
351pub struct ReplayChannel<T>(UnboundedReceiver<std::result::Result<T, MessageStatus>>);
352
353#[async_trait::async_trait]
354impl<T: Send + 'static> Channel for ReplayChannel<T> {
355    type Item = T;
356
357    async fn message(&mut self) -> std::result::Result<Option<T>, MessageStatus> {
358        match self.0.recv().await {
359            None => Ok(None),
360            Some(result) => result.map(|r| Some(r)),
361        }
362    }
363}