risingwave_hummock_trace/
opts.rs1use bincode::{Decode, Encode};
16use foyer::Hint;
17use risingwave_common::bitmap::Bitmap;
18use risingwave_common::cache::CachePriority;
19use risingwave_common::catalog::{TableId, TableOption};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
22use risingwave_pb::common::PbBuffer;
23
24use crate::TracedBytes;
25
26#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
27pub struct TracedPrefetchOptions {
28 pub prefetch: bool,
29 pub for_large_query: bool,
30}
31
32#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
33pub enum TracedCachePolicy {
34 Disable,
35 Fill(TracedCachePriority),
36 NotFill,
37}
38
39#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
40pub enum TracedCachePriority {
41 High,
42 Low,
43}
44
45impl From<CachePriority> for TracedCachePriority {
46 fn from(value: CachePriority) -> Self {
47 match value {
48 CachePriority::High => Self::High,
49 CachePriority::Low => Self::Low,
50 }
51 }
52}
53
54impl From<TracedCachePriority> for CachePriority {
55 fn from(value: TracedCachePriority) -> Self {
56 match value {
57 TracedCachePriority::High => Self::High,
58 TracedCachePriority::Low => Self::Low,
59 }
60 }
61}
62
63impl From<Hint> for TracedCachePriority {
64 fn from(value: Hint) -> Self {
65 match value {
66 Hint::Normal => Self::High,
67 Hint::Low => Self::Low,
68 }
69 }
70}
71
72impl From<TracedCachePriority> for Hint {
73 fn from(value: TracedCachePriority) -> Self {
74 match value {
75 TracedCachePriority::High => Self::Normal,
76 TracedCachePriority::Low => Self::Low,
77 }
78 }
79}
80
81#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
82pub struct TracedTableId {
83 pub table_id: u32,
84}
85
86impl From<TableId> for TracedTableId {
87 fn from(value: TableId) -> Self {
88 Self {
89 table_id: value.as_raw_id(),
90 }
91 }
92}
93
94impl From<TracedTableId> for TableId {
95 fn from(value: TracedTableId) -> Self {
96 Self::new(value.table_id)
97 }
98}
99
100#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
101pub struct TracedReadOptions {
102 pub prefix_hint: Option<TracedBytes>,
103 pub prefetch_options: TracedPrefetchOptions,
104 pub cache_policy: TracedCachePolicy,
105
106 pub retention_seconds: Option<u32>,
107 pub table_id: TracedTableId,
108 pub read_version_from_backup: bool,
109 pub read_committed: bool,
110}
111
112impl TracedReadOptions {
113 pub fn for_test(table_id: u32) -> Self {
114 Self {
115 prefix_hint: Some(TracedBytes::from(vec![0])),
116 prefetch_options: TracedPrefetchOptions {
117 prefetch: true,
118 for_large_query: true,
119 },
120 cache_policy: TracedCachePolicy::Disable,
121 retention_seconds: None,
122 table_id: TracedTableId { table_id },
123 read_version_from_backup: false,
124 read_committed: false,
125 }
126 }
127}
128
129#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
130pub struct TracedTableOption {
131 pub retention_seconds: Option<u32>,
132}
133
134impl From<TableOption> for TracedTableOption {
135 fn from(value: TableOption) -> Self {
136 Self {
137 retention_seconds: value.retention_seconds,
138 }
139 }
140}
141
142impl From<TracedTableOption> for TableOption {
143 fn from(value: TracedTableOption) -> Self {
144 Self {
145 retention_seconds: value.retention_seconds,
146 }
147 }
148}
149
150#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
151pub enum TracedOpConsistencyLevel {
152 Inconsistent,
153 ConsistentOldValue,
154}
155
156#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
157pub struct TracedNewLocalOptions {
158 pub table_id: TracedTableId,
159 pub op_consistency_level: TracedOpConsistencyLevel,
160 pub table_option: TracedTableOption,
161 pub is_replicated: bool,
162 pub vnodes: TracedBitmap,
163 pub upload_on_flush: bool,
164}
165
166#[derive(Encode, Decode, PartialEq, Debug, Clone)]
167pub struct TracedTryWaitEpochOptions {
168 pub table_id: TracedTableId,
169}
170
171#[cfg(test)]
172impl TracedNewLocalOptions {
173 pub(crate) fn for_test(table_id: u32) -> Self {
174 use risingwave_common::hash::VirtualNode;
175
176 Self {
177 table_id: TracedTableId { table_id },
178 op_consistency_level: TracedOpConsistencyLevel::Inconsistent,
179 table_option: TracedTableOption {
180 retention_seconds: None,
181 },
182 is_replicated: false,
183 vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
184 upload_on_flush: true,
185 }
186 }
187}
188
189pub type TracedHummockEpoch = u64;
190
191#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
192pub enum TracedHummockReadEpoch {
193 Committed(TracedHummockEpoch),
194 BatchQueryReadCommitted(TracedHummockEpoch, u64),
195 NoWait(TracedHummockEpoch),
196 Backup(TracedHummockEpoch),
197 TimeTravel(TracedHummockEpoch),
198}
199
200impl From<HummockReadEpoch> for TracedHummockReadEpoch {
201 fn from(value: HummockReadEpoch) -> Self {
202 match value {
203 HummockReadEpoch::Committed(epoch) => Self::Committed(epoch),
204 HummockReadEpoch::BatchQueryCommitted(epoch, version_id) => {
205 Self::BatchQueryReadCommitted(epoch, version_id.to_u64())
206 }
207 HummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch),
208 HummockReadEpoch::Backup(epoch) => Self::Backup(epoch),
209 HummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch),
210 }
211 }
212}
213
214impl From<TracedHummockReadEpoch> for HummockReadEpoch {
215 fn from(value: TracedHummockReadEpoch) -> Self {
216 match value {
217 TracedHummockReadEpoch::Committed(epoch) => Self::Committed(epoch),
218 TracedHummockReadEpoch::BatchQueryReadCommitted(epoch, version_id) => {
219 Self::BatchQueryCommitted(epoch, HummockVersionId::new(version_id))
220 }
221 TracedHummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch),
222 TracedHummockReadEpoch::Backup(epoch) => Self::Backup(epoch),
223 TracedHummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch),
224 }
225 }
226}
227
228#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
229pub struct TracedEpochPair {
230 pub curr: TracedHummockEpoch,
231 pub prev: TracedHummockEpoch,
232}
233
234impl From<EpochPair> for TracedEpochPair {
235 fn from(value: EpochPair) -> Self {
236 TracedEpochPair {
237 curr: value.curr,
238 prev: value.prev,
239 }
240 }
241}
242
243impl From<TracedEpochPair> for EpochPair {
244 fn from(value: TracedEpochPair) -> Self {
245 EpochPair {
246 curr: value.curr,
247 prev: value.prev,
248 }
249 }
250}
251
252#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
253pub struct TracedInitOptions {
254 pub epoch: TracedEpochPair,
255}
256
257#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
258pub struct TracedSealCurrentEpochOptions {
259 pub table_watermarks: Option<(bool, Vec<Vec<u8>>, bool)>,
261 pub switch_op_consistency_level: Option<bool>,
262}
263
264#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
265pub struct TracedBitmap {
266 pub compression: i32,
267 pub body: Vec<u8>,
268}
269
270impl From<Bitmap> for TracedBitmap {
271 fn from(value: Bitmap) -> Self {
272 let pb = value.to_protobuf();
273 Self {
274 compression: pb.compression,
275 body: pb.body,
276 }
277 }
278}
279
280impl From<TracedBitmap> for Bitmap {
281 fn from(value: TracedBitmap) -> Self {
282 let pb = PbBuffer {
283 compression: value.compression,
284 body: value.body,
285 };
286 Bitmap::from(&pb)
287 }
288}