risingwave_common/field_generator/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod numeric;
16mod timestamp;
17mod varchar;
18
19use std::time::Duration;
20
21// TODO(error-handling): use a new error type
22use anyhow::{Result, anyhow};
23use chrono::{DateTime, FixedOffset};
24pub use numeric::*;
25use serde_json::Value;
26pub use timestamp::*;
27pub use varchar::*;
28
29use crate::array::{ListValue, StructValue};
30use crate::types::{DataType, Datum, ScalarImpl, Timestamp, Timestamptz};
31
32pub const DEFAULT_MIN: i16 = i16::MIN;
33pub const DEFAULT_MAX: i16 = i16::MAX;
34pub const DEFAULT_START: i16 = 0;
35pub const DEFAULT_END: i16 = i16::MAX;
36
37/// default max past for `TimestampField` = 1 day
38pub const DEFAULT_MAX_PAST: Duration = Duration::from_secs(60 * 60 * 24);
39
40/// default length for `VarcharField` = 10
41pub const DEFAULT_LENGTH: usize = 10;
42
43/// fields that can be randomly generated impl this trait
44pub trait NumericFieldRandomGenerator {
45    fn new(min: Option<String>, max: Option<String>, seed: u64) -> Result<Self>
46    where
47        Self: Sized;
48
49    fn generate(&mut self, offset: u64) -> Value;
50
51    fn generate_datum(&mut self, offset: u64) -> Datum;
52}
53
54/// fields that can be continuously generated impl this trait
55pub trait NumericFieldSequenceGenerator {
56    fn new(
57        start: Option<String>,
58        end: Option<String>,
59        offset: u64,
60        step: u64,
61        event_offset: u64,
62    ) -> Result<Self>
63    where
64        Self: Sized;
65
66    fn generate(&mut self) -> Value;
67
68    fn generate_datum(&mut self) -> Datum;
69}
70
71/// the way that datagen create the field data. such as 'sequence' or 'random'.
72#[derive(Default)]
73pub enum FieldKind {
74    Sequence,
75    #[default]
76    Random,
77}
78
79pub enum VarcharProperty {
80    RandomVariableLength,
81    RandomFixedLength(Option<usize>),
82    Constant,
83}
84
85pub enum FieldGeneratorImpl {
86    I16Sequence(I16SequenceField),
87    I32Sequence(I32SequenceField),
88    I64Sequence(I64SequenceField),
89    F32Sequence(F32SequenceField),
90    F64Sequence(F64SequenceField),
91    I16Random(I16RandomField),
92    I32Random(I32RandomField),
93    I64Random(I64RandomField),
94    F32Random(F32RandomField),
95    F64Random(F64RandomField),
96    VarcharRandomVariableLength(VarcharRandomVariableLengthField),
97    VarcharRandomFixedLength(VarcharRandomFixedLengthField),
98    VarcharConstant,
99    Timestamp(ChronoField<Timestamp>),
100    Timestamptz(ChronoField<Timestamptz>),
101    Struct(Vec<(String, FieldGeneratorImpl)>),
102    List(Box<FieldGeneratorImpl>, usize),
103}
104
105impl FieldGeneratorImpl {
106    pub fn with_number_sequence(
107        data_type: DataType,
108        start: Option<String>,
109        end: Option<String>,
110        split_index: u64,
111        split_num: u64,
112        offset: u64,
113    ) -> Result<Self> {
114        match data_type {
115            DataType::Int16 => Ok(FieldGeneratorImpl::I16Sequence(I16SequenceField::new(
116                start,
117                end,
118                split_index,
119                split_num,
120                offset,
121            )?)),
122            DataType::Int32 => Ok(FieldGeneratorImpl::I32Sequence(I32SequenceField::new(
123                start,
124                end,
125                split_index,
126                split_num,
127                offset,
128            )?)),
129            DataType::Int64 => Ok(FieldGeneratorImpl::I64Sequence(I64SequenceField::new(
130                start,
131                end,
132                split_index,
133                split_num,
134                offset,
135            )?)),
136            DataType::Float32 => Ok(FieldGeneratorImpl::F32Sequence(F32SequenceField::new(
137                start,
138                end,
139                split_index,
140                split_num,
141                offset,
142            )?)),
143            DataType::Float64 => Ok(FieldGeneratorImpl::F64Sequence(F64SequenceField::new(
144                start,
145                end,
146                split_index,
147                split_num,
148                offset,
149            )?)),
150            _ => Err(anyhow!("unimplemented field generator {}", data_type)),
151        }
152    }
153
154    pub fn with_number_random(
155        data_type: DataType,
156        min: Option<String>,
157        max: Option<String>,
158        seed: u64,
159    ) -> Result<Self> {
160        match data_type {
161            DataType::Int16 => Ok(FieldGeneratorImpl::I16Random(I16RandomField::new(
162                min, max, seed,
163            )?)),
164            DataType::Int32 => Ok(FieldGeneratorImpl::I32Random(I32RandomField::new(
165                min, max, seed,
166            )?)),
167            DataType::Int64 => Ok(FieldGeneratorImpl::I64Random(I64RandomField::new(
168                min, max, seed,
169            )?)),
170            DataType::Float32 => Ok(FieldGeneratorImpl::F32Random(F32RandomField::new(
171                min, max, seed,
172            )?)),
173            DataType::Float64 => Ok(FieldGeneratorImpl::F64Random(F64RandomField::new(
174                min, max, seed,
175            )?)),
176            _ => Err(anyhow!("unimplemented field generator {}", data_type)),
177        }
178    }
179
180    pub fn with_timestamp(
181        base: Option<DateTime<FixedOffset>>,
182        max_past: Option<String>,
183        max_past_mode: Option<String>,
184        seed: u64,
185    ) -> Result<Self> {
186        Ok(FieldGeneratorImpl::Timestamp(ChronoField::new(
187            base,
188            max_past,
189            max_past_mode,
190            seed,
191        )?))
192    }
193
194    pub fn with_timestamptz(
195        base: Option<DateTime<FixedOffset>>,
196        max_past: Option<String>,
197        max_past_mode: Option<String>,
198        seed: u64,
199    ) -> Result<Self> {
200        Ok(FieldGeneratorImpl::Timestamptz(ChronoField::new(
201            base,
202            max_past,
203            max_past_mode,
204            seed,
205        )?))
206    }
207
208    pub fn with_varchar(varchar_property: &VarcharProperty, seed: u64) -> Self {
209        match varchar_property {
210            VarcharProperty::RandomFixedLength(length_option) => {
211                FieldGeneratorImpl::VarcharRandomFixedLength(VarcharRandomFixedLengthField::new(
212                    length_option,
213                    seed,
214                ))
215            }
216            VarcharProperty::RandomVariableLength => {
217                FieldGeneratorImpl::VarcharRandomVariableLength(
218                    VarcharRandomVariableLengthField::new(seed),
219                )
220            }
221            VarcharProperty::Constant => FieldGeneratorImpl::VarcharConstant,
222        }
223    }
224
225    pub fn with_struct_fields(fields: Vec<(String, FieldGeneratorImpl)>) -> Result<Self> {
226        Ok(FieldGeneratorImpl::Struct(fields))
227    }
228
229    pub fn with_list(field: FieldGeneratorImpl, length_option: Option<String>) -> Result<Self> {
230        let list_length = if let Some(length_option) = length_option {
231            length_option.parse::<usize>()?
232        } else {
233            DEFAULT_LENGTH
234        };
235        Ok(FieldGeneratorImpl::List(Box::new(field), list_length))
236    }
237
238    pub fn generate_json(&mut self, offset: u64) -> Value {
239        match self {
240            FieldGeneratorImpl::I16Sequence(f) => f.generate(),
241            FieldGeneratorImpl::I32Sequence(f) => f.generate(),
242            FieldGeneratorImpl::I64Sequence(f) => f.generate(),
243            FieldGeneratorImpl::F32Sequence(f) => f.generate(),
244            FieldGeneratorImpl::F64Sequence(f) => f.generate(),
245            FieldGeneratorImpl::I16Random(f) => f.generate(offset),
246            FieldGeneratorImpl::I32Random(f) => f.generate(offset),
247            FieldGeneratorImpl::I64Random(f) => f.generate(offset),
248            FieldGeneratorImpl::F32Random(f) => f.generate(offset),
249            FieldGeneratorImpl::F64Random(f) => f.generate(offset),
250            FieldGeneratorImpl::VarcharRandomFixedLength(f) => f.generate(offset),
251            FieldGeneratorImpl::VarcharRandomVariableLength(f) => f.generate(offset),
252            FieldGeneratorImpl::VarcharConstant => VarcharConstant::generate_json(),
253            FieldGeneratorImpl::Timestamp(f) => f.generate(offset),
254            FieldGeneratorImpl::Timestamptz(f) => f.generate(offset),
255            FieldGeneratorImpl::Struct(fields) => {
256                let map = fields
257                    .iter_mut()
258                    .map(|(name, r#gen)| (name.clone(), r#gen.generate_json(offset)))
259                    .collect();
260                Value::Object(map)
261            }
262            FieldGeneratorImpl::List(field, list_length) => {
263                let vec = (0..*list_length)
264                    .map(|_| field.generate_json(offset))
265                    .collect::<Vec<_>>();
266                Value::Array(vec)
267            }
268        }
269    }
270
271    pub fn generate_datum(&mut self, offset: u64) -> Datum {
272        match self {
273            FieldGeneratorImpl::I16Sequence(f) => f.generate_datum(),
274            FieldGeneratorImpl::I32Sequence(f) => f.generate_datum(),
275            FieldGeneratorImpl::I64Sequence(f) => f.generate_datum(),
276            FieldGeneratorImpl::F32Sequence(f) => f.generate_datum(),
277            FieldGeneratorImpl::F64Sequence(f) => f.generate_datum(),
278            FieldGeneratorImpl::I16Random(f) => f.generate_datum(offset),
279            FieldGeneratorImpl::I32Random(f) => f.generate_datum(offset),
280            FieldGeneratorImpl::I64Random(f) => f.generate_datum(offset),
281            FieldGeneratorImpl::F32Random(f) => f.generate_datum(offset),
282            FieldGeneratorImpl::F64Random(f) => f.generate_datum(offset),
283            FieldGeneratorImpl::VarcharRandomFixedLength(f) => f.generate_datum(offset),
284            FieldGeneratorImpl::VarcharRandomVariableLength(f) => f.generate_datum(offset),
285            FieldGeneratorImpl::VarcharConstant => VarcharConstant::generate_datum(),
286            FieldGeneratorImpl::Timestamp(f) => f.generate_datum(offset),
287            FieldGeneratorImpl::Timestamptz(f) => f.generate_datum(offset),
288            FieldGeneratorImpl::Struct(fields) => {
289                let data = fields
290                    .iter_mut()
291                    .map(|(_, r#gen)| r#gen.generate_datum(offset))
292                    .collect();
293                Some(ScalarImpl::Struct(StructValue::new(data)))
294            }
295            FieldGeneratorImpl::List(field, list_length) => {
296                Some(ScalarImpl::List(ListValue::from_datum_iter(
297                    &field.data_type(),
298                    std::iter::repeat_with(|| field.generate_datum(offset)).take(*list_length),
299                )))
300            }
301        }
302    }
303
304    fn data_type(&self) -> DataType {
305        match self {
306            Self::I16Sequence(_) => DataType::Int16,
307            Self::I32Sequence(_) => DataType::Int32,
308            Self::I64Sequence(_) => DataType::Int64,
309            Self::F32Sequence(_) => DataType::Float32,
310            Self::F64Sequence(_) => DataType::Float64,
311            Self::I16Random(_) => DataType::Int16,
312            Self::I32Random(_) => DataType::Int32,
313            Self::I64Random(_) => DataType::Int64,
314            Self::F32Random(_) => DataType::Float32,
315            Self::F64Random(_) => DataType::Float64,
316            Self::VarcharRandomFixedLength(_) => DataType::Varchar,
317            Self::VarcharRandomVariableLength(_) => DataType::Varchar,
318            Self::VarcharConstant => DataType::Varchar,
319            Self::Timestamp(_) => DataType::Timestamp,
320            Self::Timestamptz(_) => DataType::Timestamptz,
321            Self::Struct(_) => todo!("data_type for struct"),
322            Self::List(inner, _) => DataType::List(Box::new(inner.data_type())),
323        }
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[test]
332    fn test_partition_sequence() {
333        let split_num = 4;
334        let mut i32_fields = vec![];
335        for split_index in 0..split_num {
336            i32_fields.push(
337                FieldGeneratorImpl::with_number_sequence(
338                    DataType::Int32,
339                    Some("1".to_owned()),
340                    Some("20".to_owned()),
341                    split_index,
342                    split_num,
343                    0,
344                )
345                .unwrap(),
346            );
347        }
348
349        for step in 0..5 {
350            for (index, i32_field) in i32_fields.iter_mut().enumerate() {
351                let value = i32_field.generate_json(0);
352                assert!(value.is_number());
353                let num = value.as_u64();
354                let expected_num = split_num * step + 1 + index as u64;
355                assert_eq!(expected_num, num.unwrap());
356            }
357        }
358    }
359
360    #[test]
361    fn test_random_generate() {
362        let seed = 1234;
363        for data_type in [
364            DataType::Int16,
365            DataType::Int32,
366            DataType::Int64,
367            DataType::Float32,
368            DataType::Float64,
369            DataType::Varchar,
370            DataType::Timestamp,
371            DataType::Timestamptz,
372        ] {
373            let mut generator = match data_type {
374                DataType::Varchar => FieldGeneratorImpl::with_varchar(
375                    &VarcharProperty::RandomFixedLength(None),
376                    seed,
377                ),
378                DataType::Timestamp => {
379                    FieldGeneratorImpl::with_timestamp(None, None, None, seed).unwrap()
380                }
381                DataType::Timestamptz => {
382                    FieldGeneratorImpl::with_timestamptz(None, None, None, seed).unwrap()
383                }
384                _ => FieldGeneratorImpl::with_number_random(data_type, None, None, seed).unwrap(),
385            };
386
387            let val1 = generator.generate_json(1);
388            let val2 = generator.generate_json(2);
389
390            assert_ne!(val1, val2);
391
392            let val1_new = generator.generate_json(1);
393            let val2_new = generator.generate_json(2);
394
395            assert_eq!(val1_new, val1);
396            assert_eq!(val2_new, val2);
397
398            let datum1 = generator.generate_datum(5);
399            let datum2 = generator.generate_datum(7);
400
401            assert_ne!(datum1, datum2);
402
403            let datum1_new = generator.generate_datum(5);
404            let datum2_new = generator.generate_datum(7);
405
406            assert_eq!(datum1_new, datum1);
407            assert_eq!(datum2_new, datum2);
408        }
409    }
410
411    #[test]
412    fn test_deterministic_timestamp() {
413        let seed = 1234;
414        let base_time: DateTime<FixedOffset> =
415            DateTime::parse_from_rfc3339("2020-01-01T00:00:00+00:00").unwrap();
416        let mut generator =
417            FieldGeneratorImpl::with_timestamp(Some(base_time), None, None, seed).unwrap();
418        let val1 = generator.generate_json(1);
419        let val2 = generator.generate_json(1);
420        assert_eq!(val1, val2);
421    }
422}