risingwave_hummock_trace/replay/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod runner;
16mod worker;
17
18use std::ops::Bound;
19
20use futures::Stream;
21use futures::stream::BoxStream;
22#[cfg(test)]
23use futures_async_stream::try_stream;
24#[cfg(test)]
25use mockall::{automock, mock};
26use risingwave_hummock_sdk::HummockReadEpoch;
27use risingwave_pb::meta::subscribe_response::{Info, Operation as RespOperation};
28pub use runner::*;
29pub(crate) use worker::*;
30
31#[cfg(test)]
32use crate::TraceError;
33use crate::error::Result;
34use crate::{
35    LocalStorageId, Record, TracedBytes, TracedInitOptions, TracedNewLocalOptions,
36    TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
37};
38
39pub type ReplayItem = (TracedBytes, TracedBytes);
40pub trait ReplayItemStream = Stream<Item = ReplayItem> + Send;
41
42type ReplayGroup = Record;
43
44#[derive(Debug)]
45enum WorkerResponse {
46    Continue,
47    Shutdown,
48}
49
50pub(crate) type ReplayRequest = Option<ReplayGroup>;
51
52#[derive(PartialEq, Eq, Hash, Debug, Clone)]
53pub(crate) enum WorkerId {
54    // local storage worker
55    Local(u64, LocalStorageId),
56    // for global store
57    OneShot(u64),
58}
59
60#[async_trait::async_trait]
61pub trait LocalReplay: LocalReplayRead + ReplayWrite + Send + Sync {
62    async fn init(&mut self, options: TracedInitOptions) -> Result<()>;
63    fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions);
64    async fn try_flush(&mut self) -> Result<()>;
65    async fn flush(&mut self) -> Result<usize>;
66}
67pub trait GlobalReplay: ReplayRead + ReplayStateStore + Send + Sync {}
68
69#[cfg_attr(test, automock)]
70#[async_trait::async_trait]
71pub trait LocalReplayRead {
72    async fn iter(
73        &self,
74        key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
75        read_options: TracedReadOptions,
76    ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
77    async fn get(
78        &self,
79        key: TracedBytes,
80        read_options: TracedReadOptions,
81    ) -> Result<Option<TracedBytes>>;
82}
83
84#[cfg_attr(test, automock)]
85#[async_trait::async_trait]
86pub trait ReplayRead {
87    async fn iter(
88        &self,
89        key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
90        epoch: u64,
91        read_options: TracedReadOptions,
92    ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
93    async fn get(
94        &self,
95        key: TracedBytes,
96        epoch: u64,
97        read_options: TracedReadOptions,
98    ) -> Result<Option<TracedBytes>>;
99}
100
101#[cfg_attr(test, automock)]
102#[async_trait::async_trait]
103pub trait ReplayWrite {
104    fn insert(
105        &mut self,
106        key: TracedBytes,
107        new_val: TracedBytes,
108        old_val: Option<TracedBytes>,
109    ) -> Result<()>;
110    fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()>;
111}
112
113#[cfg_attr(test, automock)]
114#[async_trait::async_trait]
115pub trait ReplayStateStore {
116    async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize>;
117    async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64>;
118    async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
119    async fn try_wait_epoch(
120        &self,
121        epoch: HummockReadEpoch,
122        options: TracedTryWaitEpochOptions,
123    ) -> Result<()>;
124}
125
126// define mock trait for replay interfaces
127// We need to do this since the mockall crate does not support async_trait
128#[cfg(test)]
129mock! {
130    pub GlobalReplayInterface{}
131    #[async_trait::async_trait]
132    impl ReplayRead for GlobalReplayInterface{
133        async fn iter(
134            &self,
135            key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
136            epoch: u64,
137            read_options: TracedReadOptions,
138        ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
139        async fn get(
140            &self,
141            key: TracedBytes,
142            epoch: u64,
143            read_options: TracedReadOptions,
144        ) -> Result<Option<TracedBytes>>;
145    }
146    #[async_trait::async_trait]
147    impl ReplayStateStore for GlobalReplayInterface{
148        async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize>;
149        async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64,
150        ) -> Result<u64>;
151        async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
152        async fn try_wait_epoch(&self, epoch: HummockReadEpoch,options: TracedTryWaitEpochOptions) -> Result<()>;
153    }
154    impl GlobalReplay for GlobalReplayInterface{}
155}
156
157// define mock trait for local replay interfaces
158#[cfg(test)]
159mock! {
160    pub LocalReplayInterface{}
161    #[async_trait::async_trait]
162    impl LocalReplayRead for LocalReplayInterface{
163        async fn iter(
164            &self,
165            key_range: (Bound<TracedBytes>, Bound<TracedBytes>),
166            read_options: TracedReadOptions,
167        ) -> Result<BoxStream<'static, Result<ReplayItem>>>;
168        async fn get(
169            &self,
170            key: TracedBytes,
171            read_options: TracedReadOptions,
172        ) -> Result<Option<TracedBytes>>;
173    }
174    #[async_trait::async_trait]
175    impl ReplayWrite for LocalReplayInterface{
176        fn insert(&mut self, key: TracedBytes, new_val: TracedBytes, old_val: Option<TracedBytes>) -> Result<()>;
177        fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()>;
178    }
179    #[async_trait::async_trait]
180    impl LocalReplay for LocalReplayInterface{
181        async fn init(&mut self, options: TracedInitOptions) -> Result<()>;
182        fn seal_current_epoch(&mut self, next_epoch: u64, opts: TracedSealCurrentEpochOptions);
183        async fn flush(&mut self) -> Result<usize>;
184        async fn try_flush(&mut self) -> Result<()>;
185    }
186}
187
188#[cfg(test)]
189pub(crate) struct MockReplayIterStream {
190    items: Vec<ReplayItem>,
191}
192#[cfg(test)]
193impl MockReplayIterStream {
194    pub(crate) fn new(items: Vec<ReplayItem>) -> Self {
195        Self { items }
196    }
197
198    #[try_stream(ok = ReplayItem, error = TraceError)]
199
200    pub(crate) async fn into_stream(self) {
201        for (key, value) in self.items {
202            yield (key, value)
203        }
204    }
205}