risingwave_common/util/
epoch.rs1use std::sync::LazyLock;
16use std::time::{Duration, SystemTime};
17
18use easy_ext::ext;
19use parse_display::Display;
20
21use crate::types::{ScalarImpl, Timestamptz};
22
23static UNIX_RISINGWAVE_DATE_SEC: u64 = 1_617_235_200;
24
25pub static UNIX_RISINGWAVE_DATE_EPOCH: LazyLock<SystemTime> =
28 LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(UNIX_RISINGWAVE_DATE_SEC));
29
30#[derive(Clone, Copy, Debug, Display, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct Epoch(pub u64);
32
33pub const INVALID_EPOCH: u64 = 0;
35
36const EPOCH_PHYSICAL_SHIFT_BITS: u8 = 16;
37
38impl Epoch {
39 pub fn now() -> Self {
40 Self(Self::physical_now() << EPOCH_PHYSICAL_SHIFT_BITS)
41 }
42
43 #[must_use]
44 pub fn next(self) -> Self {
45 let mut physical_now = Epoch::physical_now();
46 let prev_physical_time = self.physical_time();
47
48 loop {
49 if physical_now > prev_physical_time {
50 break;
51 }
52 physical_now = Epoch::physical_now();
53
54 #[cfg(madsim)]
55 tokio::time::advance(std::time::Duration::from_micros(10));
56 #[cfg(not(madsim))]
57 std::hint::spin_loop();
58 }
59 let next_epoch = Self::from_physical_time(physical_now);
62
63 assert!(next_epoch.0 > self.0);
64 next_epoch
65 }
66
67 pub fn physical_time(&self) -> u64 {
69 self.0 >> EPOCH_PHYSICAL_SHIFT_BITS
70 }
71
72 pub fn from_physical_time(time: u64) -> Self {
73 Epoch(time << EPOCH_PHYSICAL_SHIFT_BITS)
74 }
75
76 pub fn from_unix_millis(mi: u64) -> Self {
77 Epoch((mi - UNIX_RISINGWAVE_DATE_SEC * 1000) << EPOCH_PHYSICAL_SHIFT_BITS)
78 }
79
80 pub fn from_unix_millis_or_earliest(mi: u64) -> Self {
81 Epoch((mi.saturating_sub(UNIX_RISINGWAVE_DATE_SEC * 1000)) << EPOCH_PHYSICAL_SHIFT_BITS)
82 }
83
84 pub fn physical_now() -> u64 {
85 UNIX_RISINGWAVE_DATE_EPOCH
86 .elapsed()
87 .expect("system clock set earlier than risingwave date!")
88 .as_millis() as u64
89 }
90
91 pub fn as_unix_millis(&self) -> u64 {
92 UNIX_RISINGWAVE_DATE_SEC * 1000 + self.physical_time()
93 }
94
95 pub fn as_unix_secs(&self) -> u64 {
96 UNIX_RISINGWAVE_DATE_SEC + self.physical_time() / 1000
97 }
98
99 pub fn as_timestamptz(&self) -> Timestamptz {
101 Timestamptz::from_millis(self.as_unix_millis() as i64).expect("epoch is out of range")
102 }
103
104 pub fn as_scalar(&self) -> ScalarImpl {
106 self.as_timestamptz().into()
107 }
108
109 pub fn as_system_time(&self) -> SystemTime {
111 *UNIX_RISINGWAVE_DATE_EPOCH + Duration::from_millis(self.physical_time())
112 }
113
114 pub fn subtract_ms(&self, relative_time_ms: u64) -> Self {
117 let physical_time = self.physical_time();
118
119 if physical_time < relative_time_ms {
120 Epoch(INVALID_EPOCH)
121 } else {
122 Epoch((physical_time - relative_time_ms) << EPOCH_PHYSICAL_SHIFT_BITS)
123 }
124 }
125}
126
127pub const EPOCH_AVAILABLE_BITS: u64 = 16;
128pub const MAX_SPILL_TIMES: u16 = ((1 << EPOCH_AVAILABLE_BITS) - 1) as u16;
129pub const EPOCH_SPILL_TIME_MASK: u64 = (1 << EPOCH_AVAILABLE_BITS) - 1;
131pub const MAX_EPOCH: u64 = !((1 << EPOCH_AVAILABLE_BITS) - 1);
133
134const EPOCH_INC_MIN_STEP_FOR_TEST: u64 = test_epoch(1);
138
139pub fn is_max_epoch(epoch: u64) -> bool {
140 epoch >= MAX_EPOCH
144}
145pub fn is_compatibility_max_epoch(epoch: u64) -> bool {
146 epoch == MAX_EPOCH
148}
149impl From<u64> for Epoch {
150 fn from(epoch: u64) -> Self {
151 Self(epoch)
152 }
153}
154
155#[derive(Debug, Clone, Copy, PartialEq)]
156pub struct EpochPair {
157 pub curr: u64,
158 pub prev: u64,
159}
160
161impl EpochPair {
162 pub fn new(curr: u64, prev: u64) -> Self {
163 assert!(curr > prev);
164 Self { curr, prev }
165 }
166
167 pub fn inc_for_test(&mut self) {
168 self.prev = self.curr;
169 self.curr += EPOCH_INC_MIN_STEP_FOR_TEST;
170 }
171
172 pub fn new_test_epoch(curr: u64) -> Self {
173 if !is_max_epoch(curr) {
174 assert!(curr >= EPOCH_INC_MIN_STEP_FOR_TEST);
175 assert!((curr & EPOCH_SPILL_TIME_MASK) == 0);
176 }
177 Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST)
178 }
179}
180
181pub const fn test_epoch(value_millis: u64) -> u64 {
184 value_millis << EPOCH_AVAILABLE_BITS
185}
186
187#[ext(EpochExt)]
190pub impl u64 {
191 fn inc_epoch(&mut self) {
192 *self += EPOCH_INC_MIN_STEP_FOR_TEST;
193 }
194
195 fn dec_epoch(&mut self) {
196 *self -= EPOCH_INC_MIN_STEP_FOR_TEST;
197 }
198
199 fn next_epoch(self) -> u64 {
200 self + EPOCH_INC_MIN_STEP_FOR_TEST
201 }
202
203 fn prev_epoch(self) -> u64 {
204 self - EPOCH_INC_MIN_STEP_FOR_TEST
205 }
206}
207
208pub mod task_local {
210 use futures::Future;
211 use tokio::task_local;
212
213 use super::{Epoch, EpochPair};
214
215 task_local! {
216 static TASK_LOCAL_EPOCH_PAIR: EpochPair;
217 }
218
219 pub fn curr_epoch() -> Option<Epoch> {
224 TASK_LOCAL_EPOCH_PAIR.try_with(|e| Epoch(e.curr)).ok()
225 }
226
227 pub fn prev_epoch() -> Option<Epoch> {
232 TASK_LOCAL_EPOCH_PAIR.try_with(|e| Epoch(e.prev)).ok()
233 }
234
235 pub fn epoch() -> Option<EpochPair> {
240 TASK_LOCAL_EPOCH_PAIR.try_with(|e| *e).ok()
241 }
242
243 pub async fn scope<F>(epoch: EpochPair, f: F) -> F::Output
245 where
246 F: Future,
247 {
248 TASK_LOCAL_EPOCH_PAIR.scope(epoch, f).await
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use chrono::{Local, TimeZone, Utc};
255
256 use super::*;
257
258 #[test]
259 fn test_risingwave_system_time() {
260 let utc = Utc.with_ymd_and_hms(2021, 4, 1, 0, 0, 0).unwrap();
261 let risingwave_dt = Local.from_utc_datetime(&utc.naive_utc());
262 let risingwave_st = SystemTime::from(risingwave_dt);
263 assert_eq!(risingwave_st, *UNIX_RISINGWAVE_DATE_EPOCH);
264 }
265
266 #[tokio::test]
267 async fn test_epoch_generate() {
268 let mut prev_epoch = Epoch::now();
269 for _ in 0..1000 {
270 let epoch = prev_epoch.next();
271 assert!(epoch > prev_epoch);
272 prev_epoch = epoch;
273 }
274 }
275
276 #[test]
277 fn test_subtract_ms() {
278 {
279 let epoch = Epoch(10);
280 assert_eq!(0, epoch.physical_time());
281 assert_eq!(0, epoch.subtract_ms(20).0);
282 }
283
284 {
285 let epoch = Epoch::now();
286 let physical_time = epoch.physical_time();
287 let interval = 10;
288
289 assert_ne!(0, physical_time);
290 assert_eq!(
291 physical_time - interval,
292 epoch.subtract_ms(interval).physical_time()
293 );
294 }
295 }
296}