risingwave_common/util/
epoch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16use std::time::{Duration, SystemTime};
17
18use easy_ext::ext;
19use parse_display::Display;
20
21use crate::types::{ScalarImpl, Timestamptz};
22
23static UNIX_RISINGWAVE_DATE_SEC: u64 = 1_617_235_200;
24
25/// [`UNIX_RISINGWAVE_DATE_EPOCH`] represents the risingwave date of the UNIX epoch:
26/// 2021-04-01T00:00:00Z.
27pub static UNIX_RISINGWAVE_DATE_EPOCH: LazyLock<SystemTime> =
28    LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(UNIX_RISINGWAVE_DATE_SEC));
29
30#[derive(Clone, Copy, Debug, Display, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct Epoch(pub u64);
32
33/// `INVALID_EPOCH` defines the invalid epoch value.
34pub const INVALID_EPOCH: u64 = 0;
35
36const EPOCH_PHYSICAL_SHIFT_BITS: u8 = 16;
37
38impl Epoch {
39    pub fn now() -> Self {
40        Self(Self::physical_now() << EPOCH_PHYSICAL_SHIFT_BITS)
41    }
42
43    #[must_use]
44    pub fn next(self) -> Self {
45        let mut physical_now = Epoch::physical_now();
46        let prev_physical_time = self.physical_time();
47
48        loop {
49            if physical_now > prev_physical_time {
50                break;
51            }
52            physical_now = Epoch::physical_now();
53
54            #[cfg(madsim)]
55            tokio::time::advance(std::time::Duration::from_micros(10));
56            #[cfg(not(madsim))]
57            std::hint::spin_loop();
58        }
59        // The last 16 bits of the previous epoch ((prev_epoch + 1, prev_epoch + 65536)) will be
60        // used as the gap epoch when the mem table spill occurs.
61        let next_epoch = Self::from_physical_time(physical_now);
62
63        assert!(next_epoch.0 > self.0);
64        next_epoch
65    }
66
67    /// milliseconds since the RisingWave epoch
68    pub fn physical_time(&self) -> u64 {
69        self.0 >> EPOCH_PHYSICAL_SHIFT_BITS
70    }
71
72    pub fn from_physical_time(time: u64) -> Self {
73        Epoch(time << EPOCH_PHYSICAL_SHIFT_BITS)
74    }
75
76    pub fn from_unix_millis(mi: u64) -> Self {
77        Epoch((mi - UNIX_RISINGWAVE_DATE_SEC * 1000) << EPOCH_PHYSICAL_SHIFT_BITS)
78    }
79
80    pub fn from_unix_millis_or_earliest(mi: u64) -> Self {
81        Epoch((mi.saturating_sub(UNIX_RISINGWAVE_DATE_SEC * 1000)) << EPOCH_PHYSICAL_SHIFT_BITS)
82    }
83
84    pub fn physical_now() -> u64 {
85        UNIX_RISINGWAVE_DATE_EPOCH
86            .elapsed()
87            .expect("system clock set earlier than risingwave date!")
88            .as_millis() as u64
89    }
90
91    pub fn as_unix_millis(&self) -> u64 {
92        UNIX_RISINGWAVE_DATE_SEC * 1000 + self.physical_time()
93    }
94
95    pub fn as_unix_secs(&self) -> u64 {
96        UNIX_RISINGWAVE_DATE_SEC + self.physical_time() / 1000
97    }
98
99    /// Returns the epoch in a Timestamptz.
100    pub fn as_timestamptz(&self) -> Timestamptz {
101        Timestamptz::from_millis(self.as_unix_millis() as i64).expect("epoch is out of range")
102    }
103
104    /// Returns the epoch in a Timestamptz scalar.
105    pub fn as_scalar(&self) -> ScalarImpl {
106        self.as_timestamptz().into()
107    }
108
109    /// Returns the epoch in real system time.
110    pub fn as_system_time(&self) -> SystemTime {
111        *UNIX_RISINGWAVE_DATE_EPOCH + Duration::from_millis(self.physical_time())
112    }
113
114    /// Returns the epoch subtract `relative_time_ms`, which used for ttl to get epoch corresponding
115    /// to the lowerbound timepoint (`src/storage/src/hummock/iterator/forward_user.rs`)
116    pub fn subtract_ms(&self, relative_time_ms: u64) -> Self {
117        let physical_time = self.physical_time();
118
119        if physical_time < relative_time_ms {
120            Epoch(INVALID_EPOCH)
121        } else {
122            Epoch((physical_time - relative_time_ms) << EPOCH_PHYSICAL_SHIFT_BITS)
123        }
124    }
125}
126
127pub const EPOCH_AVAILABLE_BITS: u64 = 16;
128pub const MAX_SPILL_TIMES: u16 = ((1 << EPOCH_AVAILABLE_BITS) - 1) as u16;
129// Low EPOCH_AVAILABLE_BITS bits set to 1
130pub const EPOCH_SPILL_TIME_MASK: u64 = (1 << EPOCH_AVAILABLE_BITS) - 1;
131// High (64-EPOCH_AVAILABLE_BITS) bits set to 1
132const EPOCH_MASK: u64 = !EPOCH_SPILL_TIME_MASK;
133pub const MAX_EPOCH: u64 = u64::MAX & EPOCH_MASK;
134
135// EPOCH_INC_MIN_STEP_FOR_TEST is the minimum increment step for epoch in unit tests.
136// We need to keep the lower 16 bits of the epoch unchanged during each increment,
137// and only increase the upper 48 bits.
138const EPOCH_INC_MIN_STEP_FOR_TEST: u64 = test_epoch(1);
139
140pub fn is_max_epoch(epoch: u64) -> bool {
141    // Since we have write `MAX_EPOCH` as max epoch to sstable in some previous version,
142    // it means that there may be two value in our system which represent infinite. We must check
143    // both of them for compatibility. See bug description in https://github.com/risingwavelabs/risingwave/issues/13717
144    epoch >= MAX_EPOCH
145}
146pub fn is_compatibility_max_epoch(epoch: u64) -> bool {
147    // See bug description in https://github.com/risingwavelabs/risingwave/issues/13717
148    epoch == MAX_EPOCH
149}
150impl From<u64> for Epoch {
151    fn from(epoch: u64) -> Self {
152        Self(epoch)
153    }
154}
155
156#[derive(Debug, Clone, Copy, PartialEq)]
157pub struct EpochPair {
158    pub curr: u64,
159    pub prev: u64,
160}
161
162impl EpochPair {
163    pub fn new(curr: u64, prev: u64) -> Self {
164        assert!(curr > prev);
165        Self { curr, prev }
166    }
167
168    pub fn inc_for_test(&mut self) {
169        self.prev = self.curr;
170        self.curr += EPOCH_INC_MIN_STEP_FOR_TEST;
171    }
172
173    pub fn new_test_epoch(curr: u64) -> Self {
174        if !is_max_epoch(curr) {
175            assert!(curr >= EPOCH_INC_MIN_STEP_FOR_TEST);
176            assert!((curr & EPOCH_SPILL_TIME_MASK) == 0);
177        }
178        Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST)
179    }
180}
181
182/// As most unit tests initialize a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0.
183/// This method is to turn a a random epoch into a well shifted value.
184pub const fn test_epoch(value_millis: u64) -> u64 {
185    value_millis << EPOCH_AVAILABLE_BITS
186}
187
188/// There are numerous operations in our system's unit tests that involve incrementing or decrementing the epoch.
189/// These extensions for u64 type are specifically used within the unit tests.
190#[ext(EpochExt)]
191pub impl u64 {
192    fn inc_epoch(&mut self) {
193        *self += EPOCH_INC_MIN_STEP_FOR_TEST;
194    }
195
196    fn dec_epoch(&mut self) {
197        *self -= EPOCH_INC_MIN_STEP_FOR_TEST;
198    }
199
200    fn next_epoch(self) -> u64 {
201        self + EPOCH_INC_MIN_STEP_FOR_TEST
202    }
203
204    fn prev_epoch(self) -> u64 {
205        self - EPOCH_INC_MIN_STEP_FOR_TEST
206    }
207}
208
209/// Task-local storage for the epoch pair.
210pub mod task_local {
211    use futures::Future;
212    use tokio::task_local;
213
214    use super::{Epoch, EpochPair};
215
216    task_local! {
217        static TASK_LOCAL_EPOCH_PAIR: EpochPair;
218    }
219
220    /// Retrieve the current epoch from the task local storage.
221    ///
222    /// This value is updated after every yield of the barrier message. Returns `None` if the first
223    /// barrier message is not yielded.
224    pub fn curr_epoch() -> Option<Epoch> {
225        TASK_LOCAL_EPOCH_PAIR.try_with(|e| Epoch(e.curr)).ok()
226    }
227
228    /// Retrieve the previous epoch from the task local storage.
229    ///
230    /// This value is updated after every yield of the barrier message. Returns `None` if the first
231    /// barrier message is not yielded.
232    pub fn prev_epoch() -> Option<Epoch> {
233        TASK_LOCAL_EPOCH_PAIR.try_with(|e| Epoch(e.prev)).ok()
234    }
235
236    /// Retrieve the epoch pair from the task local storage.
237    ///
238    /// This value is updated after every yield of the barrier message. Returns `None` if the first
239    /// barrier message is not yielded.
240    pub fn epoch() -> Option<EpochPair> {
241        TASK_LOCAL_EPOCH_PAIR.try_with(|e| *e).ok()
242    }
243
244    /// Provides the given epoch pair in the task local storage for the scope of the given future.
245    pub async fn scope<F>(epoch: EpochPair, f: F) -> F::Output
246    where
247        F: Future,
248    {
249        TASK_LOCAL_EPOCH_PAIR.scope(epoch, f).await
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use chrono::{Local, TimeZone, Utc};
256
257    use super::*;
258
259    #[test]
260    fn test_risingwave_system_time() {
261        let utc = Utc.with_ymd_and_hms(2021, 4, 1, 0, 0, 0).unwrap();
262        let risingwave_dt = Local.from_utc_datetime(&utc.naive_utc());
263        let risingwave_st = SystemTime::from(risingwave_dt);
264        assert_eq!(risingwave_st, *UNIX_RISINGWAVE_DATE_EPOCH);
265    }
266
267    #[tokio::test]
268    async fn test_epoch_generate() {
269        let mut prev_epoch = Epoch::now();
270        for _ in 0..1000 {
271            let epoch = prev_epoch.next();
272            assert!(epoch > prev_epoch);
273            prev_epoch = epoch;
274        }
275    }
276
277    #[test]
278    fn test_subtract_ms() {
279        {
280            let epoch = Epoch(10);
281            assert_eq!(0, epoch.physical_time());
282            assert_eq!(0, epoch.subtract_ms(20).0);
283        }
284
285        {
286            let epoch = Epoch::now();
287            let physical_time = epoch.physical_time();
288            let interval = 10;
289
290            assert_ne!(0, physical_time);
291            assert_eq!(
292                physical_time - interval,
293                epoch.subtract_ms(interval).physical_time()
294            );
295        }
296    }
297}