risingwave_hummock_trace/replay/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod runner;
16mod worker;
17
18use std::ops::Bound;
19
20use futures::Stream;
21use futures::stream::BoxStream;
22#[cfg(test)]
23use futures_async_stream::try_stream;
24#[cfg(test)]
25use mockall::{automock, mock};
26use risingwave_hummock_sdk::HummockReadEpoch;
27use risingwave_pb::meta::subscribe_response::{Info, Operation as RespOperation};
28pub use runner::*;
29pub(crate) use worker::*;
30
31#[cfg(test)]
32use crate::TraceError;
33use crate::error::Result;
34use crate::{
35    LocalStorageId, Record, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
36    TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
37};
38
39pub type ReplayItem = (TracedBytes, TracedBytes);
40pub trait ReplayItemStream = Stream<Item = ReplayItem> + Send;
41
42type ReplayGroup = Record;
43
44#[derive(Debug)]
45enum WorkerResponse {
46    Continue,
47    Shutdown,
48}
49
50pub(crate) type ReplayRequest = Option<ReplayGroup>;
51
52#[derive(PartialEq, Eq, Hash, Debug, Clone)]
53pub(crate) enum WorkerId {
54    // local storage worker
55    Local(u64, LocalStorageId),
56    // for global store
57    OneShot(u64),
58}
59
60#[async_trait::async_trait]
61pub trait LocalReplay: LocalReplayRead + ReplayWrite + Send + Sync {
62    async fn init(&mut self, options: TracedInitOptions) -> Result<()>;
63    fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions);
64    fn is_dirty(&self) -> bool;
65    fn epoch(&self) -> u64;
66    async fn try_flush(&mut self) -> Result<()>;
67    async fn flush(&mut self) -> Result<usize>;
68}
69pub trait GlobalReplay: ReplayRead + ReplayStateStore + Send + Sync {}
70
71#[cfg_attr(test, automock)]
72#[async_trait::async_trait]
73pub trait LocalReplayRead {
74    async fn iter(
75        &self,
76        key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
77        read_options: TracedReadOptions,
78    ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
79    async fn get(
80        &self,
81        key: TracedBytes,
82        read_options: TracedReadOptions,
83    ) -> Result<Option<TracedBytes>>;
84}
85
86#[cfg_attr(test, automock)]
87#[async_trait::async_trait]
88pub trait ReplayRead {
89    async fn iter(
90        &self,
91        key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
92        epoch: u64,
93        read_options: TracedReadOptions,
94    ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
95    async fn get(
96        &self,
97        key: TracedBytes,
98        epoch: u64,
99        read_options: TracedReadOptions,
100    ) -> Result<Option<TracedBytes>>;
101}
102
103#[cfg_attr(test, automock)]
104#[async_trait::async_trait]
105pub trait ReplayWrite {
106    fn insert(
107        &mut self,
108        key: TracedBytes,
109        new_val: TracedBytes,
110        old_val: Option<TracedBytes>,
111    ) -> Result<()>;
112    fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()>;
113}
114
115#[cfg_attr(test, automock)]
116#[async_trait::async_trait]
117pub trait ReplayStateStore {
118    async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize>;
119    async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64>;
120    async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
121    async fn try_wait_epoch(
122        &self,
123        epoch: HummockReadEpoch,
124        options: TracedTryWaitEpochOptions,
125    ) -> Result<()>;
126}
127
128// define mock trait for replay interfaces
129// We need to do this since the mockall crate does not support async_trait
130#[cfg(test)]
131mock! {
132    pub GlobalReplayInterface{}
133    #[async_trait::async_trait]
134    impl ReplayRead for GlobalReplayInterface{
135        async fn iter(
136            &self,
137            key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
138            epoch: u64,
139            read_options: TracedReadOptions,
140        ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
141        async fn get(
142            &self,
143            key: TracedBytes,
144            epoch: u64,
145            read_options: TracedReadOptions,
146        ) -> Result<Option<TracedBytes>>;
147    }
148    #[async_trait::async_trait]
149    impl ReplayStateStore for GlobalReplayInterface{
150        async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize>;
151        async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64,
152        ) -> Result<u64>;
153        async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
154        async fn try_wait_epoch(&self, epoch: HummockReadEpoch,options: TracedTryWaitEpochOptions) -> Result<()>;
155    }
156    impl GlobalReplay for GlobalReplayInterface{}
157}
158
159// define mock trait for local replay interfaces
160#[cfg(test)]
161mock! {
162    pub LocalReplayInterface{}
163    #[async_trait::async_trait]
164    impl LocalReplayRead for LocalReplayInterface{
165        async fn iter(
166            &self,
167            key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
168            read_options: TracedReadOptions,
169        ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
170        async fn get(
171            &self,
172            key: TracedBytes,
173            read_options: TracedReadOptions,
174        ) -> Result<Option<TracedBytes>>;
175    }
176    #[async_trait::async_trait]
177    impl ReplayWrite for LocalReplayInterface{
178        fn insert(&mut self, key: TracedBytes, new_val: TracedBytes, old_val: Option<TracedBytes>) -> Result<()>;
179        fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()>;
180    }
181    #[async_trait::async_trait]
182    impl LocalReplay for LocalReplayInterface{
183        async fn init(&mut self, options: TracedInitOptions) -> Result<()>;
184        fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions);
185        fn is_dirty(&self) -> bool;
186        fn epoch(&self) -> u64;
187        async fn flush(&mut self) -> Result<usize>;
188        async fn try_flush(&mut self) -> Result<()>;
189    }
190}
191
192#[cfg(test)]
193pub(crate) struct MockReplayIterStream {
194    items: Vec<ReplayItem>,
195}
196#[cfg(test)]
197impl MockReplayIterStream {
198    pub(crate) fn new(items: Vec<ReplayItem>) -> Self {
199        Self { items }
200    }
201
202    #[try_stream(ok = ReplayItem, error = TraceError)]
203
204    pub(crate) async fn into_stream(self) {
205        for (key, value) in self.items {
206            yield (key, value)
207        }
208    }
209}