risingwave_hummock_trace/
opts.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bincode::{Decode, Encode};
16use foyer::Hint;
17use risingwave_common::bitmap::Bitmap;
18use risingwave_common::cache::CachePriority;
19use risingwave_common::catalog::{TableId, TableOption};
20use risingwave_common::id::FragmentId;
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
23use risingwave_pb::common::PbBuffer;
24
25use crate::TracedBytes;
26
27#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
28pub struct TracedPrefetchOptions {
29    pub prefetch: bool,
30    pub for_large_query: bool,
31}
32
33#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
34pub enum TracedCachePolicy {
35    Disable,
36    Fill(TracedCachePriority),
37    NotFill,
38}
39
40#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
41pub enum TracedCachePriority {
42    High,
43    Low,
44}
45
46impl From<CachePriority> for TracedCachePriority {
47    fn from(value: CachePriority) -> Self {
48        match value {
49            CachePriority::High => Self::High,
50            CachePriority::Low => Self::Low,
51        }
52    }
53}
54
55impl From<TracedCachePriority> for CachePriority {
56    fn from(value: TracedCachePriority) -> Self {
57        match value {
58            TracedCachePriority::High => Self::High,
59            TracedCachePriority::Low => Self::Low,
60        }
61    }
62}
63
64impl From<Hint> for TracedCachePriority {
65    fn from(value: Hint) -> Self {
66        match value {
67            Hint::Normal => Self::High,
68            Hint::Low => Self::Low,
69        }
70    }
71}
72
73impl From<TracedCachePriority> for Hint {
74    fn from(value: TracedCachePriority) -> Self {
75        match value {
76            TracedCachePriority::High => Self::Normal,
77            TracedCachePriority::Low => Self::Low,
78        }
79    }
80}
81
82#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
83pub struct TracedTableId {
84    pub table_id: u32,
85}
86
87impl From<TableId> for TracedTableId {
88    fn from(value: TableId) -> Self {
89        Self {
90            table_id: value.as_raw_id(),
91        }
92    }
93}
94
95impl From<TracedTableId> for TableId {
96    fn from(value: TracedTableId) -> Self {
97        Self::new(value.table_id)
98    }
99}
100
101#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
102pub struct TracedFragmentId {
103    pub fragment_id: u32,
104}
105impl From<FragmentId> for TracedFragmentId {
106    fn from(value: FragmentId) -> Self {
107        Self {
108            fragment_id: value.as_raw_id(),
109        }
110    }
111}
112impl From<TracedFragmentId> for FragmentId {
113    fn from(value: TracedFragmentId) -> Self {
114        FragmentId::new(value.fragment_id)
115    }
116}
117
118#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
119pub struct TracedReadOptions {
120    pub prefix_hint: Option<TracedBytes>,
121    pub prefetch_options: TracedPrefetchOptions,
122    pub cache_policy: TracedCachePolicy,
123
124    pub retention_seconds: Option<u32>,
125    pub table_id: TracedTableId,
126    pub read_version_from_backup: bool,
127    pub read_committed: bool,
128}
129
130impl TracedReadOptions {
131    pub fn for_test(table_id: u32) -> Self {
132        Self {
133            prefix_hint: Some(TracedBytes::from(vec![0])),
134            prefetch_options: TracedPrefetchOptions {
135                prefetch: true,
136                for_large_query: true,
137            },
138            cache_policy: TracedCachePolicy::Disable,
139            retention_seconds: None,
140            table_id: TracedTableId { table_id },
141            read_version_from_backup: false,
142            read_committed: false,
143        }
144    }
145}
146
147#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
148pub struct TracedTableOption {
149    pub retention_seconds: Option<u32>,
150}
151
152impl From<TableOption> for TracedTableOption {
153    fn from(value: TableOption) -> Self {
154        Self {
155            retention_seconds: value.retention_seconds,
156        }
157    }
158}
159
160impl From<TracedTableOption> for TableOption {
161    fn from(value: TracedTableOption) -> Self {
162        Self {
163            retention_seconds: value.retention_seconds,
164        }
165    }
166}
167
168#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
169pub enum TracedOpConsistencyLevel {
170    Inconsistent,
171    ConsistentOldValue,
172}
173
174#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
175pub struct TracedNewLocalOptions {
176    pub table_id: TracedTableId,
177    pub fragment_id: TracedFragmentId,
178    pub op_consistency_level: TracedOpConsistencyLevel,
179    pub table_option: TracedTableOption,
180    pub is_replicated: bool,
181    pub vnodes: TracedBitmap,
182    pub upload_on_flush: bool,
183}
184
185#[derive(Encode, Decode, PartialEq, Debug, Clone)]
186pub struct TracedTryWaitEpochOptions {
187    pub table_id: TracedTableId,
188}
189
190#[cfg(test)]
191impl TracedNewLocalOptions {
192    pub(crate) fn for_test(table_id: u32) -> Self {
193        use risingwave_common::hash::VirtualNode;
194
195        Self {
196            table_id: TracedTableId { table_id },
197            fragment_id: TracedFragmentId { fragment_id: 0 },
198            op_consistency_level: TracedOpConsistencyLevel::Inconsistent,
199            table_option: TracedTableOption {
200                retention_seconds: None,
201            },
202            is_replicated: false,
203            vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
204            upload_on_flush: true,
205        }
206    }
207}
208
209pub type TracedHummockEpoch = u64;
210
211#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
212pub enum TracedHummockReadEpoch {
213    Committed(TracedHummockEpoch),
214    BatchQueryReadCommitted(TracedHummockEpoch, u64),
215    NoWait(TracedHummockEpoch),
216    Backup(TracedHummockEpoch),
217    TimeTravel(TracedHummockEpoch),
218}
219
220impl From<HummockReadEpoch> for TracedHummockReadEpoch {
221    fn from(value: HummockReadEpoch) -> Self {
222        match value {
223            HummockReadEpoch::Committed(epoch) => Self::Committed(epoch),
224            HummockReadEpoch::BatchQueryCommitted(epoch, version_id) => {
225                Self::BatchQueryReadCommitted(epoch, version_id.as_raw_id())
226            }
227            HummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch),
228            HummockReadEpoch::Backup(epoch) => Self::Backup(epoch),
229            HummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch),
230        }
231    }
232}
233
234impl From<TracedHummockReadEpoch> for HummockReadEpoch {
235    fn from(value: TracedHummockReadEpoch) -> Self {
236        match value {
237            TracedHummockReadEpoch::Committed(epoch) => Self::Committed(epoch),
238            TracedHummockReadEpoch::BatchQueryReadCommitted(epoch, version_id) => {
239                Self::BatchQueryCommitted(epoch, HummockVersionId::new(version_id))
240            }
241            TracedHummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch),
242            TracedHummockReadEpoch::Backup(epoch) => Self::Backup(epoch),
243            TracedHummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch),
244        }
245    }
246}
247
248#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
249pub struct TracedEpochPair {
250    pub curr: TracedHummockEpoch,
251    pub prev: TracedHummockEpoch,
252}
253
254impl From<EpochPair> for TracedEpochPair {
255    fn from(value: EpochPair) -> Self {
256        TracedEpochPair {
257            curr: value.curr,
258            prev: value.prev,
259        }
260    }
261}
262
263impl From<TracedEpochPair> for EpochPair {
264    fn from(value: TracedEpochPair) -> Self {
265        EpochPair {
266            curr: value.curr,
267            prev: value.prev,
268        }
269    }
270}
271
272#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
273pub struct TracedInitOptions {
274    pub epoch: TracedEpochPair,
275}
276
277#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
278pub struct TracedSealCurrentEpochOptions {
279    // The watermark is serialized into protobuf
280    pub table_watermarks: Option<(bool, Vec<Vec<u8>>, i32)>,
281    pub switch_op_consistency_level: Option<bool>,
282}
283
284#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
285pub struct TracedBitmap {
286    pub compression: i32,
287    pub body: Vec<u8>,
288}
289
290impl From<Bitmap> for TracedBitmap {
291    fn from(value: Bitmap) -> Self {
292        let pb = value.to_protobuf();
293        Self {
294            compression: pb.compression,
295            body: pb.body,
296        }
297    }
298}
299
300impl From<TracedBitmap> for Bitmap {
301    fn from(value: TracedBitmap) -> Self {
302        let pb = PbBuffer {
303            compression: value.compression,
304            body: value.body,
305        };
306        Bitmap::from(&pb)
307    }
308}