replay/
replay_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use futures::stream::BoxStream;
18use futures::{Stream, StreamExt, TryStreamExt};
19use futures_async_stream::try_stream;
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::addr::HostAddr;
22use risingwave_common_service::{Channel, NotificationClient, ObserverError};
23use risingwave_hummock_sdk::key::TableKey;
24use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId, SyncResult};
25use risingwave_hummock_trace::{
26    GlobalReplay, LocalReplay, LocalReplayRead, ReplayItem, ReplayRead, ReplayStateStore,
27    ReplayWrite, Result, TraceError, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
28    TracedReadOptions, TracedSealCurrentEpochOptions, TracedSubResp, TracedTryWaitEpochOptions,
29};
30use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey};
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::meta::subscribe_response::{Info, Operation as RespOperation};
33use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
34use risingwave_storage::hummock::HummockStorage;
35use risingwave_storage::hummock::store::LocalHummockStorage;
36use risingwave_storage::hummock::test_utils::*;
37use risingwave_storage::store::{LocalStateStore, StateStoreIterExt, to_owned_item};
38use risingwave_storage::{StateStore, StateStoreIter, StateStoreReadIter};
39use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
40
41pub(crate) struct GlobalReplayIter<S>
42where
43    S: StateStoreReadIter,
44{
45    inner: S,
46}
47
48impl<S> GlobalReplayIter<S>
49where
50    S: StateStoreReadIter,
51{
52    pub(crate) fn new(inner: S) -> Self {
53        Self { inner }
54    }
55
56    pub(crate) fn into_stream(self) -> impl Stream<Item = Result<ReplayItem>> {
57        self.inner.into_stream(to_owned_item).map(|item_res| {
58            item_res
59                .map(|(key, value)| (key.user_key.table_key.0.into(), value.into()))
60                .map_err(|_| TraceError::IterFailed("iter failed to retrieve item".to_owned()))
61        })
62    }
63}
64
65pub(crate) struct LocalReplayIter {
66    inner: Vec<ReplayItem>,
67}
68
69impl LocalReplayIter {
70    pub(crate) async fn new(iter: impl StateStoreIter) -> Self {
71        let inner = iter
72            .into_stream(to_owned_item)
73            .map_ok(|value| (value.0.user_key.table_key.0.into(), value.1.into()))
74            .try_collect::<Vec<_>>()
75            .await
76            .unwrap();
77        Self { inner }
78    }
79
80    #[try_stream(ok = ReplayItem, error = TraceError)]
81    pub(crate) async fn into_stream(self) {
82        for (key, value) in self.inner {
83            yield (key, value)
84        }
85    }
86}
87
88pub(crate) struct GlobalReplayImpl {
89    store: HummockStorage,
90    notifier: NotificationManagerRef,
91}
92
93impl GlobalReplayImpl {
94    pub(crate) fn new(store: HummockStorage, notifier: NotificationManagerRef) -> Self {
95        Self { store, notifier }
96    }
97}
98
99impl GlobalReplay for GlobalReplayImpl {}
100
101#[async_trait::async_trait]
102impl ReplayRead for GlobalReplayImpl {
103    async fn iter(
104        &self,
105        key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
106        epoch: u64,
107        read_options: TracedReadOptions,
108    ) -> Result<BoxStream<'static, Result<ReplayItem>>> {
109        let key_range = (
110            key_range.0.map(TracedBytes::into).map(TableKey),
111            key_range.1.map(TracedBytes::into).map(TableKey),
112        );
113
114        let iter = self
115            .store
116            .iter(key_range, epoch, read_options.into())
117            .await
118            .unwrap();
119        let stream = GlobalReplayIter::new(iter).into_stream().boxed();
120        Ok(stream)
121    }
122
123    async fn get(
124        &self,
125        key: TracedBytes,
126        epoch: u64,
127        read_options: TracedReadOptions,
128    ) -> Result<Option<TracedBytes>> {
129        Ok(self
130            .store
131            .get(TableKey(key.into()), epoch, read_options.into())
132            .await
133            .unwrap()
134            .map(TracedBytes::from))
135    }
136}
137
138#[async_trait::async_trait]
139impl ReplayStateStore for GlobalReplayImpl {
140    async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize> {
141        let result: SyncResult = self
142            .store
143            .sync(
144                sync_table_epochs
145                    .into_iter()
146                    .map(|(epoch, table_ids)| {
147                        (epoch, table_ids.into_iter().map(TableId::new).collect())
148                    })
149                    .collect(),
150            )
151            .await
152            .map_err(|e| TraceError::SyncFailed(format!("{e}")))?;
153        Ok(result.sync_size)
154    }
155
156    async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64> {
157        let prev_version_id = match &info {
158            Info::HummockVersionDeltas(deltas) => deltas.version_deltas.last().map(|d| d.prev_id),
159            _ => None,
160        };
161
162        self.notifier
163            .notify_hummock_with_version(op, info, Some(version));
164
165        // wait till version updated
166        if let Some(prev_version_id) = prev_version_id {
167            self.store
168                .wait_version_update(HummockVersionId::new(prev_version_id))
169                .await;
170        }
171        Ok(version)
172    }
173
174    async fn new_local(&self, options: TracedNewLocalOptions) -> Box<dyn LocalReplay> {
175        let local_storage = self.store.new_local(options.into()).await;
176        Box::new(LocalReplayImpl(local_storage))
177    }
178
179    async fn try_wait_epoch(
180        &self,
181        epoch: HummockReadEpoch,
182        options: TracedTryWaitEpochOptions,
183    ) -> Result<()> {
184        self.store
185            .try_wait_epoch(epoch, options.into())
186            .await
187            .map_err(|_| TraceError::TryWaitEpochFailed)?;
188        Ok(())
189    }
190}
191pub(crate) struct LocalReplayImpl(LocalHummockStorage);
192
193#[async_trait::async_trait]
194impl LocalReplay for LocalReplayImpl {
195    async fn init(&mut self, options: TracedInitOptions) -> Result<()> {
196        self.0
197            .init(options.into())
198            .await
199            .map_err(|_| TraceError::Other("init failed"))
200    }
201
202    fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions) {
203        self.0.seal_current_epoch(next_epoch, opts.into());
204    }
205
206    fn epoch(&self) -> u64 {
207        self.0.epoch()
208    }
209
210    async fn flush(&mut self) -> Result<usize> {
211        self.0.flush().await.map_err(|_| TraceError::FlushFailed)
212    }
213
214    fn is_dirty(&self) -> bool {
215        self.0.is_dirty()
216    }
217
218    async fn try_flush(&mut self) -> Result<()> {
219        self.0
220            .try_flush()
221            .await
222            .map_err(|_| TraceError::TryFlushFailed)
223    }
224}
225
226#[async_trait::async_trait]
227impl LocalReplayRead for LocalReplayImpl {
228    async fn iter(
229        &self,
230        key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
231        read_options: TracedReadOptions,
232    ) -> Result<BoxStream<'static, Result<ReplayItem>>> {
233        let key_range = (
234            key_range.0.map(|b| TableKey(b.into())),
235            key_range.1.map(|b| TableKey(b.into())),
236        );
237
238        let iter = LocalStateStore::iter(&self.0, key_range, read_options.into())
239            .await
240            .unwrap();
241
242        let stream = LocalReplayIter::new(iter).await.into_stream().boxed();
243        Ok(stream)
244    }
245
246    async fn get(
247        &self,
248        key: TracedBytes,
249        read_options: TracedReadOptions,
250    ) -> Result<Option<TracedBytes>> {
251        Ok(
252            LocalStateStore::get(&self.0, TableKey(key.into()), read_options.into())
253                .await
254                .unwrap()
255                .map(TracedBytes::from),
256        )
257    }
258}
259
260#[async_trait::async_trait]
261impl ReplayWrite for LocalReplayImpl {
262    fn insert(
263        &mut self,
264        key: TracedBytes,
265        new_val: TracedBytes,
266        old_val: Option<TracedBytes>,
267    ) -> Result<()> {
268        LocalStateStore::insert(
269            &mut self.0,
270            TableKey(key.into()),
271            new_val.into(),
272            old_val.map(|b| b.into()),
273        )
274        .unwrap();
275        Ok(())
276    }
277
278    fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()> {
279        LocalStateStore::delete(&mut self.0, TableKey(key.into()), old_val.into()).unwrap();
280        Ok(())
281    }
282}
283
284pub struct ReplayNotificationClient {
285    addr: HostAddr,
286    notification_manager: NotificationManagerRef,
287    first_resp: Box<TracedSubResp>,
288}
289
290impl ReplayNotificationClient {
291    pub fn new(
292        addr: HostAddr,
293        notification_manager: NotificationManagerRef,
294        first_resp: Box<TracedSubResp>,
295    ) -> Self {
296        Self {
297            addr,
298            notification_manager,
299            first_resp,
300        }
301    }
302}
303
304#[async_trait::async_trait]
305impl NotificationClient for ReplayNotificationClient {
306    type Channel = ReplayChannel<SubscribeResponse>;
307
308    async fn subscribe(
309        &self,
310        subscribe_type: SubscribeType,
311    ) -> std::result::Result<Self::Channel, ObserverError> {
312        let (tx, rx) = unbounded_channel();
313
314        self.notification_manager
315            .insert_sender(subscribe_type, WorkerKey(self.addr.to_protobuf()), tx)
316            .await;
317
318        // send the first snapshot message
319        let op = self.first_resp.0.operation();
320        let info = self.first_resp.0.info.clone();
321
322        self.notification_manager
323            .notify_hummock(op, info.unwrap())
324            .await;
325
326        Ok(ReplayChannel(rx))
327    }
328}
329
330pub fn get_replay_notification_client(
331    env: MetaSrvEnv,
332    worker_node: WorkerNode,
333    first_resp: Box<TracedSubResp>,
334) -> ReplayNotificationClient {
335    ReplayNotificationClient::new(
336        worker_node.get_host().unwrap().into(),
337        env.notification_manager_ref(),
338        first_resp,
339    )
340}
341
342pub struct ReplayChannel<T>(UnboundedReceiver<std::result::Result<T, MessageStatus>>);
343
344#[async_trait::async_trait]
345impl<T: Send + 'static> Channel for ReplayChannel<T> {
346    type Item = T;
347
348    async fn message(&mut self) -> std::result::Result<Option<T>, MessageStatus> {
349        match self.0.recv().await {
350            None => Ok(None),
351            Some(result) => result.map(|r| Some(r)),
352        }
353    }
354}