risingwave_connector/source/nexmark/
mod.rs1pub mod enumerator;
16pub mod source;
17pub mod split;
18
19use std::collections::HashMap;
20
21pub use enumerator::*;
22use nexmark::config::{NexmarkConfig, RateShape};
23use nexmark::event::EventType;
24use serde::Deserialize;
25use serde_with::{DisplayFromStr, serde_as};
26pub use split::*;
27use with_options::WithOptions;
28
29use crate::source::SourceProperties;
30use crate::source::nexmark::source::reader::NexmarkSplitReader;
31
32pub const NEXMARK_CONNECTOR: &str = "nexmark";
33
34const fn identity_i32<const V: i32>() -> i32 {
35 V
36}
37
38const fn identity_u64<const V: u64>() -> u64 {
39 V
40}
41
42const fn none<T>() -> Option<T> {
43 None
44}
45
46#[serde_as]
47#[derive(Clone, Debug, Deserialize, WithOptions)]
48pub struct NexmarkProperties {
49 #[serde_as(as = "DisplayFromStr")]
50 #[serde(rename = "nexmark.split.num", default = "identity_i32::<1>")]
51 pub split_num: i32,
52
53 #[serde_as(as = "DisplayFromStr")]
55 #[serde(rename = "nexmark.event.num", default = "default_event_num")]
56 pub event_num: u64,
57
58 #[serde(rename = "nexmark.table.type", default = "none")]
59 pub table_type: Option<EventType>,
60
61 #[serde_as(as = "DisplayFromStr")]
62 #[serde(rename = "nexmark.max.chunk.size", default = "identity_u64::<1024>")]
63 pub max_chunk_size: u64,
64
65 #[serde_as(as = "DisplayFromStr")]
66 #[serde(rename = "nexmark.use.real.time", default)]
68 pub use_real_time: bool,
69
70 #[serde_as(as = "DisplayFromStr")]
71 #[serde(
73 rename = "nexmark.min.event.gap.in.ns",
74 default = "identity_u64::<100_000>"
75 )]
76 pub min_event_gap_in_ns: u64,
77
78 #[serde_as(as = "Option<DisplayFromStr>")]
79 #[serde(rename = "nexmark.active.people", default = "none")]
80 pub active_people: Option<usize>,
81
82 #[serde_as(as = "Option<DisplayFromStr>")]
83 #[serde(rename = "nexmark.in.flight.auctions", default = "none")]
84 pub in_flight_auctions: Option<usize>,
85
86 #[serde_as(as = "Option<DisplayFromStr>")]
87 #[serde(rename = "nexmark.out.of.order.group.size", default = "none")]
88 pub out_of_order_group_size: Option<usize>,
89
90 #[serde_as(as = "Option<DisplayFromStr>")]
91 #[serde(rename = "nexmark.avg.person.byte.size", default = "none")]
92 pub avg_person_byte_size: Option<usize>,
93
94 #[serde_as(as = "Option<DisplayFromStr>")]
95 #[serde(rename = "nexmark.avg.auction.byte.size", default = "none")]
96 pub avg_auction_byte_size: Option<usize>,
97
98 #[serde_as(as = "Option<DisplayFromStr>")]
99 #[serde(rename = "nexmark.avg.bid.byte.size", default = "none")]
100 pub avg_bid_byte_size: Option<usize>,
101
102 #[serde_as(as = "Option<DisplayFromStr>")]
103 #[serde(rename = "nexmark.hot.seller.ratio", default = "none")]
104 pub hot_seller_ratio: Option<usize>,
105
106 #[serde_as(as = "Option<DisplayFromStr>")]
107 #[serde(rename = "nexmark.hot.auction.ratio", default = "none")]
108 pub hot_auction_ratio: Option<usize>,
109
110 #[serde_as(as = "Option<DisplayFromStr>")]
111 #[serde(rename = "nexmark.hot.bidder.ratio", default = "none")]
112 pub hot_bidder_ratio: Option<usize>,
113
114 #[serde_as(as = "Option<DisplayFromStr>")]
115 #[serde(rename = "nexmark.hot.channel.ratio", default = "none")]
116 pub hot_channel_ratio: Option<usize>,
117
118 #[serde_as(as = "Option<DisplayFromStr>")]
119 #[serde(rename = "nexmark.first.event.id", default = "none")]
120 pub first_event_id: Option<usize>,
121
122 #[serde_as(as = "Option<DisplayFromStr>")]
123 #[serde(rename = "nexmark.first.event.number", default = "none")]
124 pub first_event_number: Option<usize>,
125
126 #[serde_as(as = "Option<DisplayFromStr>")]
127 #[serde(rename = "nexmark.num.categories", default = "none")]
128 pub num_categories: Option<usize>,
129
130 #[serde_as(as = "Option<DisplayFromStr>")]
131 #[serde(rename = "nexmark.auction.id.lead", default = "none")]
132 pub auction_id_lead: Option<usize>,
133
134 #[serde_as(as = "Option<DisplayFromStr>")]
135 #[serde(rename = "nexmark.hot.seller.ratio.2", default = "none")]
136 pub hot_seller_ratio_2: Option<usize>,
137
138 #[serde_as(as = "Option<DisplayFromStr>")]
139 #[serde(rename = "nexmark.hot.auction.ratio.2", default = "none")]
140 pub hot_auction_ratio_2: Option<usize>,
141
142 #[serde_as(as = "Option<DisplayFromStr>")]
143 #[serde(rename = "nexmark.hot.bidder.ratio.2", default = "none")]
144 pub hot_bidder_ratio_2: Option<usize>,
145
146 #[serde_as(as = "Option<DisplayFromStr>")]
147 #[serde(rename = "nexmark.person.proportion", default = "none")]
148 pub person_proportion: Option<usize>,
149
150 #[serde_as(as = "Option<DisplayFromStr>")]
151 #[serde(rename = "nexmark.auction.proportion", default = "none")]
152 pub auction_proportion: Option<usize>,
153
154 #[serde_as(as = "Option<DisplayFromStr>")]
155 #[serde(rename = "nexmark.bid.proportion", default = "none")]
156 pub bid_proportion: Option<usize>,
157
158 #[serde_as(as = "Option<DisplayFromStr>")]
159 #[serde(rename = "nexmark.first.auction.id", default = "none")]
160 pub first_auction_id: Option<usize>,
161
162 #[serde_as(as = "Option<DisplayFromStr>")]
163 #[serde(rename = "nexmark.first.person.id", default = "none")]
164 pub first_person_id: Option<usize>,
165
166 #[serde_as(as = "Option<DisplayFromStr>")]
167 #[serde(rename = "nexmark.first.category.id", default = "none")]
168 pub first_category_id: Option<usize>,
169
170 #[serde_as(as = "Option<DisplayFromStr>")]
171 #[serde(rename = "nexmark.person.id.lead", default = "none")]
172 pub person_id_lead: Option<usize>,
173
174 #[serde_as(as = "Option<DisplayFromStr>")]
175 #[serde(rename = "nexmark.sine.approx.steps", default = "none")]
176 pub sine_approx_steps: Option<usize>,
177
178 #[serde_as(as = "Option<DisplayFromStr>")]
179 #[serde(rename = "nexmark.base.time", default = "none")]
180 pub base_time: Option<u64>,
181
182 #[serde(rename = "nexmark.us.states")]
183 pub us_states: Option<String>,
184
185 #[serde(rename = "nexmark.us.cities")]
186 pub us_cities: Option<String>,
187
188 #[serde(rename = "nexmark.first.names")]
189 pub first_names: Option<String>,
190
191 #[serde(rename = "nexmark.last.names")]
192 pub last_names: Option<String>,
193
194 #[serde(rename = "nexmark.rate.shape")]
195 pub rate_shape: Option<RateShape>,
196
197 #[serde_as(as = "Option<DisplayFromStr>")]
198 #[serde(rename = "nexmark.rate.period", default = "none")]
199 pub rate_period: Option<usize>,
200
201 #[serde_as(as = "Option<DisplayFromStr>")]
202 #[serde(rename = "nexmark.first.event.rate", default = "none")]
203 pub first_event_rate: Option<usize>,
204
205 #[serde_as(as = "Option<DisplayFromStr>")]
206 #[serde(rename = "nexmark.events.per.sec", default = "none")]
207 pub events_per_sec: Option<usize>,
208
209 #[serde_as(as = "Option<DisplayFromStr>")]
210 #[serde(rename = "nexmark.next.event.rate", default = "none")]
211 pub next_event_rate: Option<usize>,
212
213 #[serde_as(as = "Option<DisplayFromStr>")]
214 #[serde(rename = "nexmark.us.per.unit", default = "none")]
215 pub us_per_unit: Option<usize>,
216
217 #[serde_as(as = "Option<DisplayFromStr>")]
218 #[serde(rename = "nexmark.threads", default = "none")]
219 pub threads: Option<usize>,
220
221 #[serde(flatten)]
222 pub unknown_fields: HashMap<String, String>,
223}
224
225impl SourceProperties for NexmarkProperties {
226 type Split = NexmarkSplit;
227 type SplitEnumerator = NexmarkSplitEnumerator;
228 type SplitReader = NexmarkSplitReader;
229
230 const SOURCE_NAME: &'static str = NEXMARK_CONNECTOR;
231}
232
233impl crate::source::UnknownFields for NexmarkProperties {
234 fn unknown_fields(&self) -> HashMap<String, String> {
235 self.unknown_fields.clone()
236 }
237}
238
239fn default_event_num() -> u64 {
240 u64::MAX
241}
242
243impl Default for NexmarkProperties {
244 fn default() -> Self {
245 let v = serde_json::to_value(HashMap::<String, String>::new()).unwrap();
246 NexmarkProperties::deserialize(v).unwrap()
247 }
248}
249
250impl From<&NexmarkProperties> for NexmarkConfig {
251 fn from(value: &NexmarkProperties) -> Self {
252 pub const BASE_TIME: u64 = 1_436_918_400_000;
254
255 let mut cfg = match value.table_type {
256 Some(_) => NexmarkConfig {
258 base_time: BASE_TIME,
259 ..Default::default()
260 },
261 None => NexmarkConfig::default(),
263 };
264
265 macro_rules! set {
266 ($name:ident) => {
267 set!($name, $name);
268 };
269 ($cfg_name:ident, $prop_name:ident) => {
270 if let Some(v) = value.$prop_name {
271 cfg.$cfg_name = v;
272 }
273 };
274 ($name:ident @ $map:ident) => {
275 if let Some(v) = &value.$name {
276 cfg.$name = $map(v);
277 }
278 };
279 }
280 set!(active_people);
281 set!(in_flight_auctions);
282 set!(out_of_order_group_size);
283 set!(avg_person_byte_size);
284 set!(avg_auction_byte_size);
285 set!(avg_bid_byte_size);
286 set!(hot_seller_ratio);
287 set!(hot_auction_ratio);
288 set!(hot_bidder_ratio);
289 set!(hot_channel_ratio);
290 set!(first_event_id);
291 set!(first_event_number);
292 set!(base_time);
293 set!(num_categories);
294 set!(auction_id_lead);
295 set!(person_proportion);
296 set!(auction_proportion);
297 set!(bid_proportion);
298 set!(first_auction_id);
299 set!(first_person_id);
300 set!(first_category_id);
301 set!(person_id_lead);
302 set!(sine_approx_steps);
303 set!(us_states @ split_str);
304 set!(us_cities @ split_str);
305 set!(first_names @ split_str);
306 set!(last_names @ split_str);
307 set!(num_event_generators, threads);
308 set!(rate_shape);
309 set!(rate_period);
310 set!(first_rate, first_event_rate);
311 set!(next_rate, first_event_rate);
312 set!(us_per_unit);
313 cfg
314 }
315}
316
317fn split_str(string: &str) -> Vec<String> {
318 string.split(',').map(String::from).collect()
319}