risingwave_connector/source/nexmark/
mod.rs1pub mod enumerator;
16pub mod source;
17pub mod split;
18
19use std::collections::HashMap;
20
21pub use enumerator::*;
22use nexmark::config::{NexmarkConfig, RateShape};
23use nexmark::event::EventType;
24use serde::Deserialize;
25use serde_with::{DisplayFromStr, serde_as};
26pub use split::*;
27use with_options::WithOptions;
28
29use crate::enforce_secret::EnforceSecret;
30use crate::source::SourceProperties;
31use crate::source::nexmark::source::reader::NexmarkSplitReader;
32pub const NEXMARK_CONNECTOR: &str = "nexmark";
33
34const fn identity_i32<const V: i32>() -> i32 {
35 V
36}
37
38const fn identity_u64<const V: u64>() -> u64 {
39 V
40}
41
42const fn none<T>() -> Option<T> {
43 None
44}
45
46impl EnforceSecret for NexmarkProperties {}
47
48#[serde_as]
49#[derive(Clone, Debug, Deserialize, WithOptions)]
50pub struct NexmarkProperties {
51 #[serde_as(as = "DisplayFromStr")]
52 #[serde(rename = "nexmark.split.num", default = "identity_i32::<1>")]
53 pub split_num: i32,
54
55 #[serde_as(as = "DisplayFromStr")]
57 #[serde(rename = "nexmark.event.num", default = "default_event_num")]
58 pub event_num: u64,
59
60 #[serde(rename = "nexmark.table.type", default = "none")]
61 pub table_type: Option<EventType>,
62
63 #[serde_as(as = "DisplayFromStr")]
64 #[serde(rename = "nexmark.max.chunk.size", default = "identity_u64::<1024>")]
65 pub max_chunk_size: u64,
66
67 #[serde_as(as = "DisplayFromStr")]
68 #[serde(rename = "nexmark.use.real.time", default)]
70 pub use_real_time: bool,
71
72 #[serde_as(as = "DisplayFromStr")]
73 #[serde(
75 rename = "nexmark.min.event.gap.in.ns",
76 default = "identity_u64::<100_000>"
77 )]
78 pub min_event_gap_in_ns: u64,
79
80 #[serde_as(as = "Option<DisplayFromStr>")]
81 #[serde(rename = "nexmark.active.people", default = "none")]
82 pub active_people: Option<usize>,
83
84 #[serde_as(as = "Option<DisplayFromStr>")]
85 #[serde(rename = "nexmark.in.flight.auctions", default = "none")]
86 pub in_flight_auctions: Option<usize>,
87
88 #[serde_as(as = "Option<DisplayFromStr>")]
89 #[serde(rename = "nexmark.out.of.order.group.size", default = "none")]
90 pub out_of_order_group_size: Option<usize>,
91
92 #[serde_as(as = "Option<DisplayFromStr>")]
93 #[serde(rename = "nexmark.avg.person.byte.size", default = "none")]
94 pub avg_person_byte_size: Option<usize>,
95
96 #[serde_as(as = "Option<DisplayFromStr>")]
97 #[serde(rename = "nexmark.avg.auction.byte.size", default = "none")]
98 pub avg_auction_byte_size: Option<usize>,
99
100 #[serde_as(as = "Option<DisplayFromStr>")]
101 #[serde(rename = "nexmark.avg.bid.byte.size", default = "none")]
102 pub avg_bid_byte_size: Option<usize>,
103
104 #[serde_as(as = "Option<DisplayFromStr>")]
105 #[serde(rename = "nexmark.hot.seller.ratio", default = "none")]
106 pub hot_seller_ratio: Option<usize>,
107
108 #[serde_as(as = "Option<DisplayFromStr>")]
109 #[serde(rename = "nexmark.hot.auction.ratio", default = "none")]
110 pub hot_auction_ratio: Option<usize>,
111
112 #[serde_as(as = "Option<DisplayFromStr>")]
113 #[serde(rename = "nexmark.hot.bidder.ratio", default = "none")]
114 pub hot_bidder_ratio: Option<usize>,
115
116 #[serde_as(as = "Option<DisplayFromStr>")]
117 #[serde(rename = "nexmark.hot.channel.ratio", default = "none")]
118 pub hot_channel_ratio: Option<usize>,
119
120 #[serde_as(as = "Option<DisplayFromStr>")]
121 #[serde(rename = "nexmark.first.event.id", default = "none")]
122 pub first_event_id: Option<usize>,
123
124 #[serde_as(as = "Option<DisplayFromStr>")]
125 #[serde(rename = "nexmark.first.event.number", default = "none")]
126 pub first_event_number: Option<usize>,
127
128 #[serde_as(as = "Option<DisplayFromStr>")]
129 #[serde(rename = "nexmark.num.categories", default = "none")]
130 pub num_categories: Option<usize>,
131
132 #[serde_as(as = "Option<DisplayFromStr>")]
133 #[serde(rename = "nexmark.auction.id.lead", default = "none")]
134 pub auction_id_lead: Option<usize>,
135
136 #[serde_as(as = "Option<DisplayFromStr>")]
137 #[serde(rename = "nexmark.hot.seller.ratio.2", default = "none")]
138 pub hot_seller_ratio_2: Option<usize>,
139
140 #[serde_as(as = "Option<DisplayFromStr>")]
141 #[serde(rename = "nexmark.hot.auction.ratio.2", default = "none")]
142 pub hot_auction_ratio_2: Option<usize>,
143
144 #[serde_as(as = "Option<DisplayFromStr>")]
145 #[serde(rename = "nexmark.hot.bidder.ratio.2", default = "none")]
146 pub hot_bidder_ratio_2: Option<usize>,
147
148 #[serde_as(as = "Option<DisplayFromStr>")]
149 #[serde(rename = "nexmark.person.proportion", default = "none")]
150 pub person_proportion: Option<usize>,
151
152 #[serde_as(as = "Option<DisplayFromStr>")]
153 #[serde(rename = "nexmark.auction.proportion", default = "none")]
154 pub auction_proportion: Option<usize>,
155
156 #[serde_as(as = "Option<DisplayFromStr>")]
157 #[serde(rename = "nexmark.bid.proportion", default = "none")]
158 pub bid_proportion: Option<usize>,
159
160 #[serde_as(as = "Option<DisplayFromStr>")]
161 #[serde(rename = "nexmark.first.auction.id", default = "none")]
162 pub first_auction_id: Option<usize>,
163
164 #[serde_as(as = "Option<DisplayFromStr>")]
165 #[serde(rename = "nexmark.first.person.id", default = "none")]
166 pub first_person_id: Option<usize>,
167
168 #[serde_as(as = "Option<DisplayFromStr>")]
169 #[serde(rename = "nexmark.first.category.id", default = "none")]
170 pub first_category_id: Option<usize>,
171
172 #[serde_as(as = "Option<DisplayFromStr>")]
173 #[serde(rename = "nexmark.person.id.lead", default = "none")]
174 pub person_id_lead: Option<usize>,
175
176 #[serde_as(as = "Option<DisplayFromStr>")]
177 #[serde(rename = "nexmark.sine.approx.steps", default = "none")]
178 pub sine_approx_steps: Option<usize>,
179
180 #[serde_as(as = "Option<DisplayFromStr>")]
181 #[serde(rename = "nexmark.base.time", default = "none")]
182 pub base_time: Option<u64>,
183
184 #[serde(rename = "nexmark.us.states")]
185 pub us_states: Option<String>,
186
187 #[serde(rename = "nexmark.us.cities")]
188 pub us_cities: Option<String>,
189
190 #[serde(rename = "nexmark.first.names")]
191 pub first_names: Option<String>,
192
193 #[serde(rename = "nexmark.last.names")]
194 pub last_names: Option<String>,
195
196 #[serde(rename = "nexmark.rate.shape")]
197 pub rate_shape: Option<RateShape>,
198
199 #[serde_as(as = "Option<DisplayFromStr>")]
200 #[serde(rename = "nexmark.rate.period", default = "none")]
201 pub rate_period: Option<usize>,
202
203 #[serde_as(as = "Option<DisplayFromStr>")]
204 #[serde(rename = "nexmark.first.event.rate", default = "none")]
205 pub first_event_rate: Option<usize>,
206
207 #[serde_as(as = "Option<DisplayFromStr>")]
208 #[serde(rename = "nexmark.events.per.sec", default = "none")]
209 pub events_per_sec: Option<usize>,
210
211 #[serde_as(as = "Option<DisplayFromStr>")]
212 #[serde(rename = "nexmark.next.event.rate", default = "none")]
213 pub next_event_rate: Option<usize>,
214
215 #[serde_as(as = "Option<DisplayFromStr>")]
216 #[serde(rename = "nexmark.us.per.unit", default = "none")]
217 pub us_per_unit: Option<usize>,
218
219 #[serde_as(as = "Option<DisplayFromStr>")]
220 #[serde(rename = "nexmark.threads", default = "none")]
221 pub threads: Option<usize>,
222
223 #[serde(flatten)]
224 pub unknown_fields: HashMap<String, String>,
225}
226
227impl SourceProperties for NexmarkProperties {
228 type Split = NexmarkSplit;
229 type SplitEnumerator = NexmarkSplitEnumerator;
230 type SplitReader = NexmarkSplitReader;
231
232 const SOURCE_NAME: &'static str = NEXMARK_CONNECTOR;
233}
234
235impl crate::source::UnknownFields for NexmarkProperties {
236 fn unknown_fields(&self) -> HashMap<String, String> {
237 self.unknown_fields.clone()
238 }
239}
240
241fn default_event_num() -> u64 {
242 u64::MAX
243}
244
245impl Default for NexmarkProperties {
246 fn default() -> Self {
247 let v = serde_json::to_value(HashMap::<String, String>::new()).unwrap();
248 NexmarkProperties::deserialize(v).unwrap()
249 }
250}
251
252impl From<&NexmarkProperties> for NexmarkConfig {
253 fn from(value: &NexmarkProperties) -> Self {
254 pub const BASE_TIME: u64 = 1_436_918_400_000;
256
257 let mut cfg = match value.table_type {
258 Some(_) => NexmarkConfig {
260 base_time: BASE_TIME,
261 ..Default::default()
262 },
263 None => NexmarkConfig::default(),
265 };
266
267 macro_rules! set {
268 ($name:ident) => {
269 set!($name, $name);
270 };
271 ($cfg_name:ident, $prop_name:ident) => {
272 if let Some(v) = value.$prop_name {
273 cfg.$cfg_name = v;
274 }
275 };
276 ($name:ident @ $map:ident) => {
277 if let Some(v) = &value.$name {
278 cfg.$name = $map(v);
279 }
280 };
281 }
282 set!(active_people);
283 set!(in_flight_auctions);
284 set!(out_of_order_group_size);
285 set!(avg_person_byte_size);
286 set!(avg_auction_byte_size);
287 set!(avg_bid_byte_size);
288 set!(hot_seller_ratio);
289 set!(hot_auction_ratio);
290 set!(hot_bidder_ratio);
291 set!(hot_channel_ratio);
292 set!(first_event_id);
293 set!(first_event_number);
294 set!(base_time);
295 set!(num_categories);
296 set!(auction_id_lead);
297 set!(person_proportion);
298 set!(auction_proportion);
299 set!(bid_proportion);
300 set!(first_auction_id);
301 set!(first_person_id);
302 set!(first_category_id);
303 set!(person_id_lead);
304 set!(sine_approx_steps);
305 set!(us_states @ split_str);
306 set!(us_cities @ split_str);
307 set!(first_names @ split_str);
308 set!(last_names @ split_str);
309 set!(num_event_generators, threads);
310 set!(rate_shape);
311 set!(rate_period);
312 set!(first_rate, first_event_rate);
313 set!(next_rate, first_event_rate);
314 set!(us_per_unit);
315 cfg
316 }
317}
318
319fn split_str(string: &str) -> Vec<String> {
320 string.split(',').map(String::from).collect()
321}