1use bincode::{Decode, Encode};
16use foyer::Hint;
17use risingwave_common::bitmap::Bitmap;
18use risingwave_common::cache::CachePriority;
19use risingwave_common::catalog::{TableId, TableOption};
20use risingwave_common::id::FragmentId;
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
23use risingwave_pb::common::PbBuffer;
24
25use crate::TracedBytes;
26
27#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
28pub struct TracedPrefetchOptions {
29 pub prefetch: bool,
30 pub for_large_query: bool,
31}
32
33#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
34pub enum TracedCachePolicy {
35 Disable,
36 Fill(TracedCachePriority),
37 NotFill,
38}
39
40#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
41pub enum TracedCachePriority {
42 High,
43 Low,
44}
45
46impl From<CachePriority> for TracedCachePriority {
47 fn from(value: CachePriority) -> Self {
48 match value {
49 CachePriority::High => Self::High,
50 CachePriority::Low => Self::Low,
51 }
52 }
53}
54
55impl From<TracedCachePriority> for CachePriority {
56 fn from(value: TracedCachePriority) -> Self {
57 match value {
58 TracedCachePriority::High => Self::High,
59 TracedCachePriority::Low => Self::Low,
60 }
61 }
62}
63
64impl From<Hint> for TracedCachePriority {
65 fn from(value: Hint) -> Self {
66 match value {
67 Hint::Normal => Self::High,
68 Hint::Low => Self::Low,
69 }
70 }
71}
72
73impl From<TracedCachePriority> for Hint {
74 fn from(value: TracedCachePriority) -> Self {
75 match value {
76 TracedCachePriority::High => Self::Normal,
77 TracedCachePriority::Low => Self::Low,
78 }
79 }
80}
81
82#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
83pub struct TracedTableId {
84 pub table_id: u32,
85}
86
87impl From<TableId> for TracedTableId {
88 fn from(value: TableId) -> Self {
89 Self {
90 table_id: value.as_raw_id(),
91 }
92 }
93}
94
95impl From<TracedTableId> for TableId {
96 fn from(value: TracedTableId) -> Self {
97 Self::new(value.table_id)
98 }
99}
100
101#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
102pub struct TracedFragmentId {
103 pub fragment_id: u32,
104}
105impl From<FragmentId> for TracedFragmentId {
106 fn from(value: FragmentId) -> Self {
107 Self {
108 fragment_id: value.as_raw_id(),
109 }
110 }
111}
112impl From<TracedFragmentId> for FragmentId {
113 fn from(value: TracedFragmentId) -> Self {
114 FragmentId::new(value.fragment_id)
115 }
116}
117
118#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
119pub struct TracedReadOptions {
120 pub prefix_hint: Option<TracedBytes>,
121 pub prefetch_options: TracedPrefetchOptions,
122 pub cache_policy: TracedCachePolicy,
123
124 pub retention_seconds: Option<u32>,
125 pub table_id: TracedTableId,
126 pub read_version_from_backup: bool,
127 pub read_committed: bool,
128}
129
130impl TracedReadOptions {
131 pub fn for_test(table_id: u32) -> Self {
132 Self {
133 prefix_hint: Some(TracedBytes::from(vec![0])),
134 prefetch_options: TracedPrefetchOptions {
135 prefetch: true,
136 for_large_query: true,
137 },
138 cache_policy: TracedCachePolicy::Disable,
139 retention_seconds: None,
140 table_id: TracedTableId { table_id },
141 read_version_from_backup: false,
142 read_committed: false,
143 }
144 }
145}
146
147#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
148pub struct TracedTableOption {
149 pub retention_seconds: Option<u32>,
150}
151
152impl From<TableOption> for TracedTableOption {
153 fn from(value: TableOption) -> Self {
154 Self {
155 retention_seconds: value.retention_seconds,
156 }
157 }
158}
159
160impl From<TracedTableOption> for TableOption {
161 fn from(value: TracedTableOption) -> Self {
162 Self {
163 retention_seconds: value.retention_seconds,
164 }
165 }
166}
167
168#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
169pub enum TracedOpConsistencyLevel {
170 Inconsistent,
171 ConsistentOldValue,
172}
173
174#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
175pub struct TracedNewLocalOptions {
176 pub table_id: TracedTableId,
177 pub fragment_id: TracedFragmentId,
178 pub op_consistency_level: TracedOpConsistencyLevel,
179 pub table_option: TracedTableOption,
180 pub is_replicated: bool,
181 pub vnodes: TracedBitmap,
182 pub upload_on_flush: bool,
183}
184
185#[derive(Encode, Decode, PartialEq, Debug, Clone)]
186pub struct TracedTryWaitEpochOptions {
187 pub table_id: TracedTableId,
188}
189
190#[cfg(test)]
191impl TracedNewLocalOptions {
192 pub(crate) fn for_test(table_id: u32) -> Self {
193 use risingwave_common::hash::VirtualNode;
194
195 Self {
196 table_id: TracedTableId { table_id },
197 fragment_id: TracedFragmentId { fragment_id: 0 },
198 op_consistency_level: TracedOpConsistencyLevel::Inconsistent,
199 table_option: TracedTableOption {
200 retention_seconds: None,
201 },
202 is_replicated: false,
203 vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
204 upload_on_flush: true,
205 }
206 }
207}
208
209pub type TracedHummockEpoch = u64;
210
211#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
212pub enum TracedHummockReadEpoch {
213 Committed(TracedHummockEpoch),
214 BatchQueryReadCommitted(TracedHummockEpoch, u64),
215 NoWait(TracedHummockEpoch),
216 Backup(TracedHummockEpoch),
217 TimeTravel(TracedHummockEpoch),
218}
219
220impl From<HummockReadEpoch> for TracedHummockReadEpoch {
221 fn from(value: HummockReadEpoch) -> Self {
222 match value {
223 HummockReadEpoch::Committed(epoch) => Self::Committed(epoch),
224 HummockReadEpoch::BatchQueryCommitted(epoch, version_id) => {
225 Self::BatchQueryReadCommitted(epoch, version_id.as_raw_id())
226 }
227 HummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch),
228 HummockReadEpoch::Backup(epoch) => Self::Backup(epoch),
229 HummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch),
230 }
231 }
232}
233
234impl From<TracedHummockReadEpoch> for HummockReadEpoch {
235 fn from(value: TracedHummockReadEpoch) -> Self {
236 match value {
237 TracedHummockReadEpoch::Committed(epoch) => Self::Committed(epoch),
238 TracedHummockReadEpoch::BatchQueryReadCommitted(epoch, version_id) => {
239 Self::BatchQueryCommitted(epoch, HummockVersionId::new(version_id))
240 }
241 TracedHummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch),
242 TracedHummockReadEpoch::Backup(epoch) => Self::Backup(epoch),
243 TracedHummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch),
244 }
245 }
246}
247
248#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
249pub struct TracedEpochPair {
250 pub curr: TracedHummockEpoch,
251 pub prev: TracedHummockEpoch,
252}
253
254impl From<EpochPair> for TracedEpochPair {
255 fn from(value: EpochPair) -> Self {
256 TracedEpochPair {
257 curr: value.curr,
258 prev: value.prev,
259 }
260 }
261}
262
263impl From<TracedEpochPair> for EpochPair {
264 fn from(value: TracedEpochPair) -> Self {
265 EpochPair {
266 curr: value.curr,
267 prev: value.prev,
268 }
269 }
270}
271
272#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
273pub struct TracedInitOptions {
274 pub epoch: TracedEpochPair,
275}
276
277#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
278pub struct TracedSealCurrentEpochOptions {
279 pub table_watermarks: Option<(bool, Vec<Vec<u8>>, i32)>,
281 pub switch_op_consistency_level: Option<bool>,
282}
283
284#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)]
285pub struct TracedBitmap {
286 pub compression: i32,
287 pub body: Vec<u8>,
288}
289
290impl From<Bitmap> for TracedBitmap {
291 fn from(value: Bitmap) -> Self {
292 let pb = value.to_protobuf();
293 Self {
294 compression: pb.compression,
295 body: pb.body,
296 }
297 }
298}
299
300impl From<TracedBitmap> for Bitmap {
301 fn from(value: TracedBitmap) -> Self {
302 let pb = PbBuffer {
303 compression: value.compression,
304 body: value.body,
305 };
306 Bitmap::from(&pb)
307 }
308}