risingwave_common/field_generator/
timestamp.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Result;
16use chrono::prelude::*;
17use chrono::{Duration, DurationRound};
18use humantime::parse_duration;
19use rand::rngs::StdRng;
20use rand::{Rng, SeedableRng};
21use serde_json::Value;
22use tracing::debug;
23
24use super::DEFAULT_MAX_PAST;
25use crate::types::{Datum, Scalar, Timestamp, Timestamptz};
26
27pub struct ChronoField<T: ChronoFieldInner> {
28    max_past: Duration,
29    absolute_base: Option<T>,
30    seed: u64,
31}
32
33impl<T: ChronoFieldInner> ChronoField<T> {
34    pub fn new(
35        base: Option<DateTime<FixedOffset>>,
36        max_past_option: Option<String>,
37        max_past_mode: Option<String>,
38        seed: u64,
39    ) -> Result<Self> {
40        let local_now = match max_past_mode.as_deref() {
41            Some("relative") => None,
42            _ => Some(T::from_now()),
43        };
44
45        let max_past = if let Some(max_past_option) = max_past_option {
46            parse_duration(&max_past_option)?
47        } else {
48            // default max_past = 1 day
49            DEFAULT_MAX_PAST
50        };
51        debug!(?local_now, ?max_past, "parse timestamp field option");
52        Ok(Self {
53            // convert to chrono::Duration
54            max_past: chrono::Duration::from_std(max_past)?,
55            absolute_base: base.map(T::from_base).or(local_now),
56            seed,
57        })
58    }
59
60    fn generate_data(&mut self, offset: u64) -> T {
61        let milliseconds = self.max_past.num_milliseconds();
62        let mut rng = StdRng::seed_from_u64(offset ^ self.seed);
63        let max_milliseconds = rng.random_range(0..=milliseconds);
64        let base = match self.absolute_base {
65            Some(base) => base,
66            None => T::from_now(),
67        };
68        base.minus(Duration::milliseconds(max_milliseconds))
69    }
70
71    pub fn generate(&mut self, offset: u64) -> Value {
72        self.generate_data(offset).to_json()
73    }
74
75    pub fn generate_datum(&mut self, offset: u64) -> Datum {
76        Some(self.generate_data(offset).to_scalar_value())
77    }
78}
79
80pub trait ChronoFieldInner: std::fmt::Debug + Copy + Scalar {
81    fn from_now() -> Self;
82    fn from_base(base: DateTime<FixedOffset>) -> Self;
83    fn minus(&self, duration: Duration) -> Self;
84    fn to_json(&self) -> Value;
85}
86
87impl ChronoFieldInner for Timestamp {
88    fn from_now() -> Self {
89        Timestamp::new(
90            Local::now()
91                .naive_local()
92                .duration_round(Duration::microseconds(1))
93                .unwrap(),
94        )
95    }
96
97    fn from_base(base: DateTime<FixedOffset>) -> Self {
98        Timestamp::new(base.naive_local())
99    }
100
101    fn minus(&self, duration: Duration) -> Self {
102        Timestamp::new(self.0 - duration)
103    }
104
105    fn to_json(&self) -> Value {
106        Value::String(self.0.to_string())
107    }
108}
109
110impl ChronoFieldInner for Timestamptz {
111    fn from_now() -> Self {
112        Timestamptz::from(
113            Utc::now()
114                .duration_round(Duration::microseconds(1))
115                .unwrap(),
116        )
117    }
118
119    fn from_base(base: DateTime<FixedOffset>) -> Self {
120        Timestamptz::from(base)
121    }
122
123    fn minus(&self, duration: Duration) -> Self {
124        Timestamptz::from(self.to_datetime_utc() - duration)
125    }
126
127    fn to_json(&self) -> Value {
128        Value::String(self.to_string())
129    }
130}