1use bincode::{Decode, Encode};
16use foyer::CacheHint;
17use risingwave_common::bitmap::Bitmap;
18use risingwave_common::cache::CachePriority;
19use risingwave_common::catalog::{TableId, TableOption};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
22use risingwave_pb::common::PbBuffer;
23
24use crate::TracedBytes;
25
26#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
27pub struct TracedPrefetchOptions {
28 pub prefetch: bool,
29 pub for_large_query: bool,
30}
31
32#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
33pub enum TracedCachePolicy {
34 Disable,
35 Fill(TracedCachePriority),
36 NotFill,
37}
38
39#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
40pub enum TracedCachePriority {
41 High,
42 Low,
43}
44
45impl From<CachePriority> for TracedCachePriority {
46 fn from(value: CachePriority) -> Self {
47 match value {
48 CachePriority::High => Self::High,
49 CachePriority::Low => Self::Low,
50 }
51 }
52}
53
54impl From<TracedCachePriority> for CachePriority {
55 fn from(value: TracedCachePriority) -> Self {
56 match value {
57 TracedCachePriority::High => Self::High,
58 TracedCachePriority::Low => Self::Low,
59 }
60 }
61}
62
63impl From<CacheHint> for TracedCachePriority {
64 fn from(value: CacheHint) -> Self {
65 match value {
66 CacheHint::Normal => Self::High,
67 CacheHint::Low => Self::Low,
68 }
69 }
70}
71
72impl From<TracedCachePriority> for CacheHint {
73 fn from(value: TracedCachePriority) -> Self {
74 match value {
75 TracedCachePriority::High => Self::Normal,
76 TracedCachePriority::Low => Self::Low,
77 }
78 }
79}
80
81#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
82pub struct TracedTableId {
83 pub table_id: u32,
84}
85
86impl From<TableId> for TracedTableId {
87 fn from(value: TableId) -> Self {
88 Self {
89 table_id: value.table_id,
90 }
91 }
92}
93
94impl From<TracedTableId> for TableId {
95 fn from(value: TracedTableId) -> Self {
96 Self {
97 table_id: value.table_id,
98 }
99 }
100}
101
102#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
103pub struct TracedReadOptions {
104 pub prefix_hint: Option<TracedBytes>,
105 pub prefetch_options: TracedPrefetchOptions,
106 pub cache_policy: TracedCachePolicy,
107
108 pub retention_seconds: Option<u32>,
109 pub table_id: TracedTableId,
110 pub read_version_from_backup: bool,
111 pub read_committed: bool,
112}
113
114impl TracedReadOptions {
115 pub fn for_test(table_id: u32) -> Self {
116 Self {
117 prefix_hint: Some(TracedBytes::from(vec![0])),
118 prefetch_options: TracedPrefetchOptions {
119 prefetch: true,
120 for_large_query: true,
121 },
122 cache_policy: TracedCachePolicy::Disable,
123 retention_seconds: None,
124 table_id: TracedTableId { table_id },
125 read_version_from_backup: false,
126 read_committed: false,
127 }
128 }
129}
130
131#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
132pub struct TracedWriteOptions {
133 pub epoch: u64,
134 pub table_id: TracedTableId,
135}
136
137#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
138pub struct TracedTableOption {
139 pub retention_seconds: Option<u32>,
140}
141
142impl From<TableOption> for TracedTableOption {
143 fn from(value: TableOption) -> Self {
144 Self {
145 retention_seconds: value.retention_seconds,
146 }
147 }
148}
149
150impl From<TracedTableOption> for TableOption {
151 fn from(value: TracedTableOption) -> Self {
152 Self {
153 retention_seconds: value.retention_seconds,
154 }
155 }
156}
157
158#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
159pub enum TracedOpConsistencyLevel {
160 Inconsistent,
161 ConsistentOldValue,
162}
163
164#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
165pub struct TracedNewLocalOptions {
166 pub table_id: TracedTableId,
167 pub op_consistency_level: TracedOpConsistencyLevel,
168 pub table_option: TracedTableOption,
169 pub is_replicated: bool,
170 pub vnodes: TracedBitmap,
171}
172
173#[derive(Encode, Decode, PartialEq, Debug, Clone)]
174pub struct TracedTryWaitEpochOptions {
175 pub table_id: TracedTableId,
176}
177
178#[cfg(test)]
179impl TracedNewLocalOptions {
180 pub(crate) fn for_test(table_id: u32) -> Self {
181 use risingwave_common::hash::VirtualNode;
182
183 Self {
184 table_id: TracedTableId { table_id },
185 op_consistency_level: TracedOpConsistencyLevel::Inconsistent,
186 table_option: TracedTableOption {
187 retention_seconds: None,
188 },
189 is_replicated: false,
190 vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
191 }
192 }
193}
194
195pub type TracedHummockEpoch = u64;
196
197#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
198pub enum TracedHummockReadEpoch {
199 Committed(TracedHummockEpoch),
200 BatchQueryReadCommitted(TracedHummockEpoch, u64),
201 NoWait(TracedHummockEpoch),
202 Backup(TracedHummockEpoch),
203 TimeTravel(TracedHummockEpoch),
204}
205
206impl From<HummockReadEpoch> for TracedHummockReadEpoch {
207 fn from(value: HummockReadEpoch) -> Self {
208 match value {
209 HummockReadEpoch::Committed(epoch) => Self::Committed(epoch),
210 HummockReadEpoch::BatchQueryCommitted(epoch, version_id) => {
211 Self::BatchQueryReadCommitted(epoch, version_id.to_u64())
212 }
213 HummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch),
214 HummockReadEpoch::Backup(epoch) => Self::Backup(epoch),
215 HummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch),
216 }
217 }
218}
219
220impl From<TracedHummockReadEpoch> for HummockReadEpoch {
221 fn from(value: TracedHummockReadEpoch) -> Self {
222 match value {
223 TracedHummockReadEpoch::Committed(epoch) => Self::Committed(epoch),
224 TracedHummockReadEpoch::BatchQueryReadCommitted(epoch, version_id) => {
225 Self::BatchQueryCommitted(epoch, HummockVersionId::new(version_id))
226 }
227 TracedHummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch),
228 TracedHummockReadEpoch::Backup(epoch) => Self::Backup(epoch),
229 TracedHummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch),
230 }
231 }
232}
233
234#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
235pub struct TracedEpochPair {
236 pub curr: TracedHummockEpoch,
237 pub prev: TracedHummockEpoch,
238}
239
240impl From<EpochPair> for TracedEpochPair {
241 fn from(value: EpochPair) -> Self {
242 TracedEpochPair {
243 curr: value.curr,
244 prev: value.prev,
245 }
246 }
247}
248
249impl From<TracedEpochPair> for EpochPair {
250 fn from(value: TracedEpochPair) -> Self {
251 EpochPair {
252 curr: value.curr,
253 prev: value.prev,
254 }
255 }
256}
257
258#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
259pub struct TracedInitOptions {
260 pub epoch: TracedEpochPair,
261}
262
263#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
264pub struct TracedSealCurrentEpochOptions {
265 pub table_watermarks: Option<(bool, Vec<Vec<u8>>, bool)>,
267 pub switch_op_consistency_level: Option<bool>,
268}
269
270#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
271pub struct TracedBitmap {
272 pub compression: i32,
273 pub body: Vec<u8>,
274}
275
276impl From<Bitmap> for TracedBitmap {
277 fn from(value: Bitmap) -> Self {
278 let pb = value.to_protobuf();
279 Self {
280 compression: pb.compression,
281 body: pb.body,
282 }
283 }
284}
285
286impl From<TracedBitmap> for Bitmap {
287 fn from(value: TracedBitmap) -> Self {
288 let pb = PbBuffer {
289 compression: value.compression,
290 body: value.body,
291 };
292 Bitmap::from(&pb)
293 }
294}