risingwave_connector/source/nexmark/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod enumerator;
16pub mod source;
17pub mod split;
18
19use std::collections::HashMap;
20
21pub use enumerator::*;
22use nexmark::config::{NexmarkConfig, RateShape};
23use nexmark::event::EventType;
24use serde::Deserialize;
25use serde_with::{DisplayFromStr, serde_as};
26pub use split::*;
27use with_options::WithOptions;
28
29use crate::source::SourceProperties;
30use crate::source::nexmark::source::reader::NexmarkSplitReader;
31
32pub const NEXMARK_CONNECTOR: &str = "nexmark";
33
34const fn identity_i32<const V: i32>() -> i32 {
35    V
36}
37
38const fn identity_u64<const V: u64>() -> u64 {
39    V
40}
41
42const fn none<T>() -> Option<T> {
43    None
44}
45
46#[serde_as]
47#[derive(Clone, Debug, Deserialize, WithOptions)]
48pub struct NexmarkProperties {
49    #[serde_as(as = "DisplayFromStr")]
50    #[serde(rename = "nexmark.split.num", default = "identity_i32::<1>")]
51    pub split_num: i32,
52
53    /// The total event count of Bid + Auction + Person
54    #[serde_as(as = "DisplayFromStr")]
55    #[serde(rename = "nexmark.event.num", default = "default_event_num")]
56    pub event_num: u64,
57
58    #[serde(rename = "nexmark.table.type", default = "none")]
59    pub table_type: Option<EventType>,
60
61    #[serde_as(as = "DisplayFromStr")]
62    #[serde(rename = "nexmark.max.chunk.size", default = "identity_u64::<1024>")]
63    pub max_chunk_size: u64,
64
65    #[serde_as(as = "DisplayFromStr")]
66    /// The event time gap will be like the time gap in the generated data, default false
67    #[serde(rename = "nexmark.use.real.time", default)]
68    pub use_real_time: bool,
69
70    #[serde_as(as = "DisplayFromStr")]
71    /// Minimal gap between two events, default 100000, so that the default max throughput is 10000
72    #[serde(
73        rename = "nexmark.min.event.gap.in.ns",
74        default = "identity_u64::<100_000>"
75    )]
76    pub min_event_gap_in_ns: u64,
77
78    #[serde_as(as = "Option<DisplayFromStr>")]
79    #[serde(rename = "nexmark.active.people", default = "none")]
80    pub active_people: Option<usize>,
81
82    #[serde_as(as = "Option<DisplayFromStr>")]
83    #[serde(rename = "nexmark.in.flight.auctions", default = "none")]
84    pub in_flight_auctions: Option<usize>,
85
86    #[serde_as(as = "Option<DisplayFromStr>")]
87    #[serde(rename = "nexmark.out.of.order.group.size", default = "none")]
88    pub out_of_order_group_size: Option<usize>,
89
90    #[serde_as(as = "Option<DisplayFromStr>")]
91    #[serde(rename = "nexmark.avg.person.byte.size", default = "none")]
92    pub avg_person_byte_size: Option<usize>,
93
94    #[serde_as(as = "Option<DisplayFromStr>")]
95    #[serde(rename = "nexmark.avg.auction.byte.size", default = "none")]
96    pub avg_auction_byte_size: Option<usize>,
97
98    #[serde_as(as = "Option<DisplayFromStr>")]
99    #[serde(rename = "nexmark.avg.bid.byte.size", default = "none")]
100    pub avg_bid_byte_size: Option<usize>,
101
102    #[serde_as(as = "Option<DisplayFromStr>")]
103    #[serde(rename = "nexmark.hot.seller.ratio", default = "none")]
104    pub hot_seller_ratio: Option<usize>,
105
106    #[serde_as(as = "Option<DisplayFromStr>")]
107    #[serde(rename = "nexmark.hot.auction.ratio", default = "none")]
108    pub hot_auction_ratio: Option<usize>,
109
110    #[serde_as(as = "Option<DisplayFromStr>")]
111    #[serde(rename = "nexmark.hot.bidder.ratio", default = "none")]
112    pub hot_bidder_ratio: Option<usize>,
113
114    #[serde_as(as = "Option<DisplayFromStr>")]
115    #[serde(rename = "nexmark.hot.channel.ratio", default = "none")]
116    pub hot_channel_ratio: Option<usize>,
117
118    #[serde_as(as = "Option<DisplayFromStr>")]
119    #[serde(rename = "nexmark.first.event.id", default = "none")]
120    pub first_event_id: Option<usize>,
121
122    #[serde_as(as = "Option<DisplayFromStr>")]
123    #[serde(rename = "nexmark.first.event.number", default = "none")]
124    pub first_event_number: Option<usize>,
125
126    #[serde_as(as = "Option<DisplayFromStr>")]
127    #[serde(rename = "nexmark.num.categories", default = "none")]
128    pub num_categories: Option<usize>,
129
130    #[serde_as(as = "Option<DisplayFromStr>")]
131    #[serde(rename = "nexmark.auction.id.lead", default = "none")]
132    pub auction_id_lead: Option<usize>,
133
134    #[serde_as(as = "Option<DisplayFromStr>")]
135    #[serde(rename = "nexmark.hot.seller.ratio.2", default = "none")]
136    pub hot_seller_ratio_2: Option<usize>,
137
138    #[serde_as(as = "Option<DisplayFromStr>")]
139    #[serde(rename = "nexmark.hot.auction.ratio.2", default = "none")]
140    pub hot_auction_ratio_2: Option<usize>,
141
142    #[serde_as(as = "Option<DisplayFromStr>")]
143    #[serde(rename = "nexmark.hot.bidder.ratio.2", default = "none")]
144    pub hot_bidder_ratio_2: Option<usize>,
145
146    #[serde_as(as = "Option<DisplayFromStr>")]
147    #[serde(rename = "nexmark.person.proportion", default = "none")]
148    pub person_proportion: Option<usize>,
149
150    #[serde_as(as = "Option<DisplayFromStr>")]
151    #[serde(rename = "nexmark.auction.proportion", default = "none")]
152    pub auction_proportion: Option<usize>,
153
154    #[serde_as(as = "Option<DisplayFromStr>")]
155    #[serde(rename = "nexmark.bid.proportion", default = "none")]
156    pub bid_proportion: Option<usize>,
157
158    #[serde_as(as = "Option<DisplayFromStr>")]
159    #[serde(rename = "nexmark.first.auction.id", default = "none")]
160    pub first_auction_id: Option<usize>,
161
162    #[serde_as(as = "Option<DisplayFromStr>")]
163    #[serde(rename = "nexmark.first.person.id", default = "none")]
164    pub first_person_id: Option<usize>,
165
166    #[serde_as(as = "Option<DisplayFromStr>")]
167    #[serde(rename = "nexmark.first.category.id", default = "none")]
168    pub first_category_id: Option<usize>,
169
170    #[serde_as(as = "Option<DisplayFromStr>")]
171    #[serde(rename = "nexmark.person.id.lead", default = "none")]
172    pub person_id_lead: Option<usize>,
173
174    #[serde_as(as = "Option<DisplayFromStr>")]
175    #[serde(rename = "nexmark.sine.approx.steps", default = "none")]
176    pub sine_approx_steps: Option<usize>,
177
178    #[serde_as(as = "Option<DisplayFromStr>")]
179    #[serde(rename = "nexmark.base.time", default = "none")]
180    pub base_time: Option<u64>,
181
182    #[serde(rename = "nexmark.us.states")]
183    pub us_states: Option<String>,
184
185    #[serde(rename = "nexmark.us.cities")]
186    pub us_cities: Option<String>,
187
188    #[serde(rename = "nexmark.first.names")]
189    pub first_names: Option<String>,
190
191    #[serde(rename = "nexmark.last.names")]
192    pub last_names: Option<String>,
193
194    #[serde(rename = "nexmark.rate.shape")]
195    pub rate_shape: Option<RateShape>,
196
197    #[serde_as(as = "Option<DisplayFromStr>")]
198    #[serde(rename = "nexmark.rate.period", default = "none")]
199    pub rate_period: Option<usize>,
200
201    #[serde_as(as = "Option<DisplayFromStr>")]
202    #[serde(rename = "nexmark.first.event.rate", default = "none")]
203    pub first_event_rate: Option<usize>,
204
205    #[serde_as(as = "Option<DisplayFromStr>")]
206    #[serde(rename = "nexmark.events.per.sec", default = "none")]
207    pub events_per_sec: Option<usize>,
208
209    #[serde_as(as = "Option<DisplayFromStr>")]
210    #[serde(rename = "nexmark.next.event.rate", default = "none")]
211    pub next_event_rate: Option<usize>,
212
213    #[serde_as(as = "Option<DisplayFromStr>")]
214    #[serde(rename = "nexmark.us.per.unit", default = "none")]
215    pub us_per_unit: Option<usize>,
216
217    #[serde_as(as = "Option<DisplayFromStr>")]
218    #[serde(rename = "nexmark.threads", default = "none")]
219    pub threads: Option<usize>,
220
221    #[serde(flatten)]
222    pub unknown_fields: HashMap<String, String>,
223}
224
225impl SourceProperties for NexmarkProperties {
226    type Split = NexmarkSplit;
227    type SplitEnumerator = NexmarkSplitEnumerator;
228    type SplitReader = NexmarkSplitReader;
229
230    const SOURCE_NAME: &'static str = NEXMARK_CONNECTOR;
231}
232
233impl crate::source::UnknownFields for NexmarkProperties {
234    fn unknown_fields(&self) -> HashMap<String, String> {
235        self.unknown_fields.clone()
236    }
237}
238
239fn default_event_num() -> u64 {
240    u64::MAX
241}
242
243impl Default for NexmarkProperties {
244    fn default() -> Self {
245        let v = serde_json::to_value(HashMap::<String, String>::new()).unwrap();
246        NexmarkProperties::deserialize(v).unwrap()
247    }
248}
249
250impl From<&NexmarkProperties> for NexmarkConfig {
251    fn from(value: &NexmarkProperties) -> Self {
252        // 2015-07-15 00:00:00
253        pub const BASE_TIME: u64 = 1_436_918_400_000;
254
255        let mut cfg = match value.table_type {
256            // This is the old way
257            Some(_) => NexmarkConfig {
258                base_time: BASE_TIME,
259                ..Default::default()
260            },
261            // By using default, it will choose the default proportion of three different events.
262            None => NexmarkConfig::default(),
263        };
264
265        macro_rules! set {
266            ($name:ident) => {
267                set!($name, $name);
268            };
269            ($cfg_name:ident, $prop_name:ident) => {
270                if let Some(v) = value.$prop_name {
271                    cfg.$cfg_name = v;
272                }
273            };
274            ($name:ident @ $map:ident) => {
275                if let Some(v) = &value.$name {
276                    cfg.$name = $map(v);
277                }
278            };
279        }
280        set!(active_people);
281        set!(in_flight_auctions);
282        set!(out_of_order_group_size);
283        set!(avg_person_byte_size);
284        set!(avg_auction_byte_size);
285        set!(avg_bid_byte_size);
286        set!(hot_seller_ratio);
287        set!(hot_auction_ratio);
288        set!(hot_bidder_ratio);
289        set!(hot_channel_ratio);
290        set!(first_event_id);
291        set!(first_event_number);
292        set!(base_time);
293        set!(num_categories);
294        set!(auction_id_lead);
295        set!(person_proportion);
296        set!(auction_proportion);
297        set!(bid_proportion);
298        set!(first_auction_id);
299        set!(first_person_id);
300        set!(first_category_id);
301        set!(person_id_lead);
302        set!(sine_approx_steps);
303        set!(us_states @ split_str);
304        set!(us_cities @ split_str);
305        set!(first_names @ split_str);
306        set!(last_names @ split_str);
307        set!(num_event_generators, threads);
308        set!(rate_shape);
309        set!(rate_period);
310        set!(first_rate, first_event_rate);
311        set!(next_rate, first_event_rate);
312        set!(us_per_unit);
313        cfg
314    }
315}
316
317fn split_str(string: &str) -> Vec<String> {
318    string.split(',').map(String::from).collect()
319}