risingwave_common/field_generator/
timestamp.rs
1use anyhow::Result;
16use chrono::prelude::*;
17use chrono::{Duration, DurationRound};
18use humantime::parse_duration;
19use rand::rngs::StdRng;
20use rand::{Rng, SeedableRng};
21use serde_json::Value;
22use tracing::debug;
23
24use super::DEFAULT_MAX_PAST;
25use crate::types::{Datum, Scalar, Timestamp, Timestamptz};
26
27pub struct ChronoField<T: ChronoFieldInner> {
28 max_past: Duration,
29 absolute_base: Option<T>,
30 seed: u64,
31}
32
33impl<T: ChronoFieldInner> ChronoField<T> {
34 pub fn new(
35 base: Option<DateTime<FixedOffset>>,
36 max_past_option: Option<String>,
37 max_past_mode: Option<String>,
38 seed: u64,
39 ) -> Result<Self> {
40 let local_now = match max_past_mode.as_deref() {
41 Some("relative") => None,
42 _ => Some(T::from_now()),
43 };
44
45 let max_past = if let Some(max_past_option) = max_past_option {
46 parse_duration(&max_past_option)?
47 } else {
48 DEFAULT_MAX_PAST
50 };
51 debug!(?local_now, ?max_past, "parse timestamp field option");
52 Ok(Self {
53 max_past: chrono::Duration::from_std(max_past)?,
55 absolute_base: base.map(T::from_base).or(local_now),
56 seed,
57 })
58 }
59
60 fn generate_data(&mut self, offset: u64) -> T {
61 let milliseconds = self.max_past.num_milliseconds();
62 let mut rng = StdRng::seed_from_u64(offset ^ self.seed);
63 let max_milliseconds = rng.random_range(0..=milliseconds);
64 let base = match self.absolute_base {
65 Some(base) => base,
66 None => T::from_now(),
67 };
68 base.minus(Duration::milliseconds(max_milliseconds))
69 }
70
71 pub fn generate(&mut self, offset: u64) -> Value {
72 self.generate_data(offset).to_json()
73 }
74
75 pub fn generate_datum(&mut self, offset: u64) -> Datum {
76 Some(self.generate_data(offset).to_scalar_value())
77 }
78}
79
80pub trait ChronoFieldInner: std::fmt::Debug + Copy + Scalar {
81 fn from_now() -> Self;
82 fn from_base(base: DateTime<FixedOffset>) -> Self;
83 fn minus(&self, duration: Duration) -> Self;
84 fn to_json(&self) -> Value;
85}
86
87impl ChronoFieldInner for Timestamp {
88 fn from_now() -> Self {
89 Timestamp::new(
90 Local::now()
91 .naive_local()
92 .duration_round(Duration::microseconds(1))
93 .unwrap(),
94 )
95 }
96
97 fn from_base(base: DateTime<FixedOffset>) -> Self {
98 Timestamp::new(base.naive_local())
99 }
100
101 fn minus(&self, duration: Duration) -> Self {
102 Timestamp::new(self.0 - duration)
103 }
104
105 fn to_json(&self) -> Value {
106 Value::String(self.0.to_string())
107 }
108}
109
110impl ChronoFieldInner for Timestamptz {
111 fn from_now() -> Self {
112 Timestamptz::from(
113 Utc::now()
114 .duration_round(Duration::microseconds(1))
115 .unwrap(),
116 )
117 }
118
119 fn from_base(base: DateTime<FixedOffset>) -> Self {
120 Timestamptz::from(base)
121 }
122
123 fn minus(&self, duration: Duration) -> Self {
124 Timestamptz::from(self.to_datetime_utc() - duration)
125 }
126
127 fn to_json(&self) -> Value {
128 Value::String(self.to_string())
129 }
130}