risingwave_common/util/
epoch.rs1use std::sync::LazyLock;
16use std::time::{Duration, SystemTime};
17
18use easy_ext::ext;
19use parse_display::Display;
20
21use crate::types::{ScalarImpl, Timestamptz};
22
23static UNIX_RISINGWAVE_DATE_SEC: u64 = 1_617_235_200;
24
25pub static UNIX_RISINGWAVE_DATE_EPOCH: LazyLock<SystemTime> =
28 LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(UNIX_RISINGWAVE_DATE_SEC));
29
30#[derive(Clone, Copy, Debug, Display, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct Epoch(pub u64);
32
33pub const INVALID_EPOCH: u64 = 0;
35
36const EPOCH_PHYSICAL_SHIFT_BITS: u8 = 16;
37
38impl Epoch {
39 pub fn now() -> Self {
40 Self(Self::physical_now() << EPOCH_PHYSICAL_SHIFT_BITS)
41 }
42
43 #[must_use]
44 pub fn next(self) -> Self {
45 let mut physical_now = Epoch::physical_now();
46 let prev_physical_time = self.physical_time();
47
48 loop {
49 if physical_now > prev_physical_time {
50 break;
51 }
52 physical_now = Epoch::physical_now();
53
54 #[cfg(madsim)]
55 tokio::time::advance(std::time::Duration::from_micros(10));
56 #[cfg(not(madsim))]
57 std::hint::spin_loop();
58 }
59 let next_epoch = Self::from_physical_time(physical_now);
62
63 assert!(next_epoch.0 > self.0);
64 next_epoch
65 }
66
67 pub fn physical_time(&self) -> u64 {
69 self.0 >> EPOCH_PHYSICAL_SHIFT_BITS
70 }
71
72 pub fn from_physical_time(time: u64) -> Self {
73 Epoch(time << EPOCH_PHYSICAL_SHIFT_BITS)
74 }
75
76 pub fn from_unix_millis(mi: u64) -> Self {
77 Epoch((mi - UNIX_RISINGWAVE_DATE_SEC * 1000) << EPOCH_PHYSICAL_SHIFT_BITS)
78 }
79
80 pub fn from_unix_millis_or_earliest(mi: u64) -> Self {
81 Epoch((mi.saturating_sub(UNIX_RISINGWAVE_DATE_SEC * 1000)) << EPOCH_PHYSICAL_SHIFT_BITS)
82 }
83
84 pub fn physical_now() -> u64 {
85 UNIX_RISINGWAVE_DATE_EPOCH
86 .elapsed()
87 .expect("system clock set earlier than risingwave date!")
88 .as_millis() as u64
89 }
90
91 pub fn as_unix_millis(&self) -> u64 {
92 UNIX_RISINGWAVE_DATE_SEC * 1000 + self.physical_time()
93 }
94
95 pub fn as_unix_secs(&self) -> u64 {
96 UNIX_RISINGWAVE_DATE_SEC + self.physical_time() / 1000
97 }
98
99 pub fn as_timestamptz(&self) -> Timestamptz {
101 Timestamptz::from_millis(self.as_unix_millis() as i64).expect("epoch is out of range")
102 }
103
104 pub fn as_scalar(&self) -> ScalarImpl {
106 self.as_timestamptz().into()
107 }
108
109 pub fn as_system_time(&self) -> SystemTime {
111 *UNIX_RISINGWAVE_DATE_EPOCH + Duration::from_millis(self.physical_time())
112 }
113
114 pub fn subtract_ms(&self, relative_time_ms: u64) -> Self {
117 let physical_time = self.physical_time();
118
119 if physical_time < relative_time_ms {
120 Epoch(INVALID_EPOCH)
121 } else {
122 Epoch((physical_time - relative_time_ms) << EPOCH_PHYSICAL_SHIFT_BITS)
123 }
124 }
125}
126
127pub const EPOCH_AVAILABLE_BITS: u64 = 16;
128pub const MAX_SPILL_TIMES: u16 = ((1 << EPOCH_AVAILABLE_BITS) - 1) as u16;
129pub const EPOCH_SPILL_TIME_MASK: u64 = (1 << EPOCH_AVAILABLE_BITS) - 1;
131const EPOCH_MASK: u64 = !EPOCH_SPILL_TIME_MASK;
133pub const MAX_EPOCH: u64 = u64::MAX & EPOCH_MASK;
134
135const EPOCH_INC_MIN_STEP_FOR_TEST: u64 = test_epoch(1);
139
140pub fn is_max_epoch(epoch: u64) -> bool {
141 epoch >= MAX_EPOCH
145}
146pub fn is_compatibility_max_epoch(epoch: u64) -> bool {
147 epoch == MAX_EPOCH
149}
150impl From<u64> for Epoch {
151 fn from(epoch: u64) -> Self {
152 Self(epoch)
153 }
154}
155
156#[derive(Debug, Clone, Copy, PartialEq)]
157pub struct EpochPair {
158 pub curr: u64,
159 pub prev: u64,
160}
161
162impl EpochPair {
163 pub fn new(curr: u64, prev: u64) -> Self {
164 assert!(curr > prev);
165 Self { curr, prev }
166 }
167
168 pub fn inc_for_test(&mut self) {
169 self.prev = self.curr;
170 self.curr += EPOCH_INC_MIN_STEP_FOR_TEST;
171 }
172
173 pub fn new_test_epoch(curr: u64) -> Self {
174 if !is_max_epoch(curr) {
175 assert!(curr >= EPOCH_INC_MIN_STEP_FOR_TEST);
176 assert!((curr & EPOCH_SPILL_TIME_MASK) == 0);
177 }
178 Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST)
179 }
180}
181
182pub const fn test_epoch(value_millis: u64) -> u64 {
185 value_millis << EPOCH_AVAILABLE_BITS
186}
187
188#[ext(EpochExt)]
191pub impl u64 {
192 fn inc_epoch(&mut self) {
193 *self += EPOCH_INC_MIN_STEP_FOR_TEST;
194 }
195
196 fn dec_epoch(&mut self) {
197 *self -= EPOCH_INC_MIN_STEP_FOR_TEST;
198 }
199
200 fn next_epoch(self) -> u64 {
201 self + EPOCH_INC_MIN_STEP_FOR_TEST
202 }
203
204 fn prev_epoch(self) -> u64 {
205 self - EPOCH_INC_MIN_STEP_FOR_TEST
206 }
207}
208
209pub mod task_local {
211 use futures::Future;
212 use tokio::task_local;
213
214 use super::{Epoch, EpochPair};
215
216 task_local! {
217 static TASK_LOCAL_EPOCH_PAIR: EpochPair;
218 }
219
220 pub fn curr_epoch() -> Option<Epoch> {
225 TASK_LOCAL_EPOCH_PAIR.try_with(|e| Epoch(e.curr)).ok()
226 }
227
228 pub fn prev_epoch() -> Option<Epoch> {
233 TASK_LOCAL_EPOCH_PAIR.try_with(|e| Epoch(e.prev)).ok()
234 }
235
236 pub fn epoch() -> Option<EpochPair> {
241 TASK_LOCAL_EPOCH_PAIR.try_with(|e| *e).ok()
242 }
243
244 pub async fn scope<F>(epoch: EpochPair, f: F) -> F::Output
246 where
247 F: Future,
248 {
249 TASK_LOCAL_EPOCH_PAIR.scope(epoch, f).await
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use chrono::{Local, TimeZone, Utc};
256
257 use super::*;
258
259 #[test]
260 fn test_risingwave_system_time() {
261 let utc = Utc.with_ymd_and_hms(2021, 4, 1, 0, 0, 0).unwrap();
262 let risingwave_dt = Local.from_utc_datetime(&utc.naive_utc());
263 let risingwave_st = SystemTime::from(risingwave_dt);
264 assert_eq!(risingwave_st, *UNIX_RISINGWAVE_DATE_EPOCH);
265 }
266
267 #[tokio::test]
268 async fn test_epoch_generate() {
269 let mut prev_epoch = Epoch::now();
270 for _ in 0..1000 {
271 let epoch = prev_epoch.next();
272 assert!(epoch > prev_epoch);
273 prev_epoch = epoch;
274 }
275 }
276
277 #[test]
278 fn test_subtract_ms() {
279 {
280 let epoch = Epoch(10);
281 assert_eq!(0, epoch.physical_time());
282 assert_eq!(0, epoch.subtract_ms(20).0);
283 }
284
285 {
286 let epoch = Epoch::now();
287 let physical_time = epoch.physical_time();
288 let interval = 10;
289
290 assert_ne!(0, physical_time);
291 assert_eq!(
292 physical_time - interval,
293 epoch.subtract_ms(interval).physical_time()
294 );
295 }
296 }
297}