risingwave_hummock_trace/
opts.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bincode::{Decode, Encode};
16use foyer::CacheHint;
17use risingwave_common::bitmap::Bitmap;
18use risingwave_common::cache::CachePriority;
19use risingwave_common::catalog::{TableId, TableOption};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
22use risingwave_pb::common::PbBuffer;
23
24use crate::TracedBytes;
25
26#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
27pub struct TracedPrefetchOptions {
28    pub prefetch: bool,
29    pub for_large_query: bool,
30}
31
32#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
33pub enum TracedCachePolicy {
34    Disable,
35    Fill(TracedCachePriority),
36    NotFill,
37}
38
39#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
40pub enum TracedCachePriority {
41    High,
42    Low,
43}
44
45impl From<CachePriority> for TracedCachePriority {
46    fn from(value: CachePriority) -> Self {
47        match value {
48            CachePriority::High => Self::High,
49            CachePriority::Low => Self::Low,
50        }
51    }
52}
53
54impl From<TracedCachePriority> for CachePriority {
55    fn from(value: TracedCachePriority) -> Self {
56        match value {
57            TracedCachePriority::High => Self::High,
58            TracedCachePriority::Low => Self::Low,
59        }
60    }
61}
62
63impl From<CacheHint> for TracedCachePriority {
64    fn from(value: CacheHint) -> Self {
65        match value {
66            CacheHint::Normal => Self::High,
67            CacheHint::Low => Self::Low,
68        }
69    }
70}
71
72impl From<TracedCachePriority> for CacheHint {
73    fn from(value: TracedCachePriority) -> Self {
74        match value {
75            TracedCachePriority::High => Self::Normal,
76            TracedCachePriority::Low => Self::Low,
77        }
78    }
79}
80
81#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
82pub struct TracedTableId {
83    pub table_id: u32,
84}
85
86impl From<TableId> for TracedTableId {
87    fn from(value: TableId) -> Self {
88        Self {
89            table_id: value.table_id,
90        }
91    }
92}
93
94impl From<TracedTableId> for TableId {
95    fn from(value: TracedTableId) -> Self {
96        Self {
97            table_id: value.table_id,
98        }
99    }
100}
101
102#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
103pub struct TracedReadOptions {
104    pub prefix_hint: Option<TracedBytes>,
105    pub prefetch_options: TracedPrefetchOptions,
106    pub cache_policy: TracedCachePolicy,
107
108    pub retention_seconds: Option<u32>,
109    pub table_id: TracedTableId,
110    pub read_version_from_backup: bool,
111    pub read_committed: bool,
112}
113
114impl TracedReadOptions {
115    pub fn for_test(table_id: u32) -> Self {
116        Self {
117            prefix_hint: Some(TracedBytes::from(vec![0])),
118            prefetch_options: TracedPrefetchOptions {
119                prefetch: true,
120                for_large_query: true,
121            },
122            cache_policy: TracedCachePolicy::Disable,
123            retention_seconds: None,
124            table_id: TracedTableId { table_id },
125            read_version_from_backup: false,
126            read_committed: false,
127        }
128    }
129}
130
131#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
132pub struct TracedWriteOptions {
133    pub epoch: u64,
134    pub table_id: TracedTableId,
135}
136
137#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
138pub struct TracedTableOption {
139    pub retention_seconds: Option<u32>,
140}
141
142impl From<TableOption> for TracedTableOption {
143    fn from(value: TableOption) -> Self {
144        Self {
145            retention_seconds: value.retention_seconds,
146        }
147    }
148}
149
150impl From<TracedTableOption> for TableOption {
151    fn from(value: TracedTableOption) -> Self {
152        Self {
153            retention_seconds: value.retention_seconds,
154        }
155    }
156}
157
158#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
159pub enum TracedOpConsistencyLevel {
160    Inconsistent,
161    ConsistentOldValue,
162}
163
164#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
165pub struct TracedNewLocalOptions {
166    pub table_id: TracedTableId,
167    pub op_consistency_level: TracedOpConsistencyLevel,
168    pub table_option: TracedTableOption,
169    pub is_replicated: bool,
170    pub vnodes: TracedBitmap,
171}
172
173#[derive(Encode, Decode, PartialEq, Debug, Clone)]
174pub struct TracedTryWaitEpochOptions {
175    pub table_id: TracedTableId,
176}
177
178#[cfg(test)]
179impl TracedNewLocalOptions {
180    pub(crate) fn for_test(table_id: u32) -> Self {
181        use risingwave_common::hash::VirtualNode;
182
183        Self {
184            table_id: TracedTableId { table_id },
185            op_consistency_level: TracedOpConsistencyLevel::Inconsistent,
186            table_option: TracedTableOption {
187                retention_seconds: None,
188            },
189            is_replicated: false,
190            vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
191        }
192    }
193}
194
195pub type TracedHummockEpoch = u64;
196
197#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
198pub enum TracedHummockReadEpoch {
199    Committed(TracedHummockEpoch),
200    BatchQueryReadCommitted(TracedHummockEpoch, u64),
201    NoWait(TracedHummockEpoch),
202    Backup(TracedHummockEpoch),
203    TimeTravel(TracedHummockEpoch),
204}
205
206impl From<HummockReadEpoch> for TracedHummockReadEpoch {
207    fn from(value: HummockReadEpoch) -> Self {
208        match value {
209            HummockReadEpoch::Committed(epoch) => Self::Committed(epoch),
210            HummockReadEpoch::BatchQueryCommitted(epoch, version_id) => {
211                Self::BatchQueryReadCommitted(epoch, version_id.to_u64())
212            }
213            HummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch),
214            HummockReadEpoch::Backup(epoch) => Self::Backup(epoch),
215            HummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch),
216        }
217    }
218}
219
220impl From<TracedHummockReadEpoch> for HummockReadEpoch {
221    fn from(value: TracedHummockReadEpoch) -> Self {
222        match value {
223            TracedHummockReadEpoch::Committed(epoch) => Self::Committed(epoch),
224            TracedHummockReadEpoch::BatchQueryReadCommitted(epoch, version_id) => {
225                Self::BatchQueryCommitted(epoch, HummockVersionId::new(version_id))
226            }
227            TracedHummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch),
228            TracedHummockReadEpoch::Backup(epoch) => Self::Backup(epoch),
229            TracedHummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch),
230        }
231    }
232}
233
234#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
235pub struct TracedEpochPair {
236    pub curr: TracedHummockEpoch,
237    pub prev: TracedHummockEpoch,
238}
239
240impl From<EpochPair> for TracedEpochPair {
241    fn from(value: EpochPair) -> Self {
242        TracedEpochPair {
243            curr: value.curr,
244            prev: value.prev,
245        }
246    }
247}
248
249impl From<TracedEpochPair> for EpochPair {
250    fn from(value: TracedEpochPair) -> Self {
251        EpochPair {
252            curr: value.curr,
253            prev: value.prev,
254        }
255    }
256}
257
258#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
259pub struct TracedInitOptions {
260    pub epoch: TracedEpochPair,
261}
262
263#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
264pub struct TracedSealCurrentEpochOptions {
265    // The watermark is serialized into protobuf
266    pub table_watermarks: Option<(bool, Vec<Vec<u8>>, bool)>,
267    pub switch_op_consistency_level: Option<bool>,
268}
269
270#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
271pub struct TracedBitmap {
272    pub compression: i32,
273    pub body: Vec<u8>,
274}
275
276impl From<Bitmap> for TracedBitmap {
277    fn from(value: Bitmap) -> Self {
278        let pb = value.to_protobuf();
279        Self {
280            compression: pb.compression,
281            body: pb.body,
282        }
283    }
284}
285
286impl From<TracedBitmap> for Bitmap {
287    fn from(value: TracedBitmap) -> Self {
288        let pb = PbBuffer {
289            compression: value.compression,
290            body: value.body,
291        };
292        Bitmap::from(&pb)
293    }
294}