risingwave_connector/source/nexmark/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod enumerator;
16pub mod source;
17pub mod split;
18
19use std::collections::HashMap;
20
21pub use enumerator::*;
22use nexmark::config::{NexmarkConfig, RateShape};
23use nexmark::event::EventType;
24use serde::Deserialize;
25use serde_with::{DisplayFromStr, serde_as};
26pub use split::*;
27use with_options::WithOptions;
28
29use crate::enforce_secret::EnforceSecret;
30use crate::source::SourceProperties;
31use crate::source::nexmark::source::reader::NexmarkSplitReader;
32pub const NEXMARK_CONNECTOR: &str = "nexmark";
33
34const fn identity_i32<const V: i32>() -> i32 {
35    V
36}
37
38const fn identity_u64<const V: u64>() -> u64 {
39    V
40}
41
42const fn none<T>() -> Option<T> {
43    None
44}
45
46impl EnforceSecret for NexmarkProperties {}
47
48#[serde_as]
49#[derive(Clone, Debug, Deserialize, WithOptions)]
50pub struct NexmarkProperties {
51    #[serde_as(as = "DisplayFromStr")]
52    #[serde(rename = "nexmark.split.num", default = "identity_i32::<1>")]
53    pub split_num: i32,
54
55    /// The total event count of Bid + Auction + Person
56    #[serde_as(as = "DisplayFromStr")]
57    #[serde(rename = "nexmark.event.num", default = "default_event_num")]
58    pub event_num: u64,
59
60    #[serde(rename = "nexmark.table.type", default = "none")]
61    pub table_type: Option<EventType>,
62
63    #[serde_as(as = "DisplayFromStr")]
64    #[serde(rename = "nexmark.max.chunk.size", default = "identity_u64::<1024>")]
65    pub max_chunk_size: u64,
66
67    #[serde_as(as = "DisplayFromStr")]
68    /// The event time gap will be like the time gap in the generated data, default false
69    #[serde(rename = "nexmark.use.real.time", default)]
70    pub use_real_time: bool,
71
72    #[serde_as(as = "DisplayFromStr")]
73    /// Minimal gap between two events, default 100000, so that the default max throughput is 10000
74    #[serde(
75        rename = "nexmark.min.event.gap.in.ns",
76        default = "identity_u64::<100_000>"
77    )]
78    pub min_event_gap_in_ns: u64,
79
80    #[serde_as(as = "Option<DisplayFromStr>")]
81    #[serde(rename = "nexmark.active.people", default = "none")]
82    pub active_people: Option<usize>,
83
84    #[serde_as(as = "Option<DisplayFromStr>")]
85    #[serde(rename = "nexmark.in.flight.auctions", default = "none")]
86    pub in_flight_auctions: Option<usize>,
87
88    #[serde_as(as = "Option<DisplayFromStr>")]
89    #[serde(rename = "nexmark.out.of.order.group.size", default = "none")]
90    pub out_of_order_group_size: Option<usize>,
91
92    #[serde_as(as = "Option<DisplayFromStr>")]
93    #[serde(rename = "nexmark.avg.person.byte.size", default = "none")]
94    pub avg_person_byte_size: Option<usize>,
95
96    #[serde_as(as = "Option<DisplayFromStr>")]
97    #[serde(rename = "nexmark.avg.auction.byte.size", default = "none")]
98    pub avg_auction_byte_size: Option<usize>,
99
100    #[serde_as(as = "Option<DisplayFromStr>")]
101    #[serde(rename = "nexmark.avg.bid.byte.size", default = "none")]
102    pub avg_bid_byte_size: Option<usize>,
103
104    #[serde_as(as = "Option<DisplayFromStr>")]
105    #[serde(rename = "nexmark.hot.seller.ratio", default = "none")]
106    pub hot_seller_ratio: Option<usize>,
107
108    #[serde_as(as = "Option<DisplayFromStr>")]
109    #[serde(rename = "nexmark.hot.auction.ratio", default = "none")]
110    pub hot_auction_ratio: Option<usize>,
111
112    #[serde_as(as = "Option<DisplayFromStr>")]
113    #[serde(rename = "nexmark.hot.bidder.ratio", default = "none")]
114    pub hot_bidder_ratio: Option<usize>,
115
116    #[serde_as(as = "Option<DisplayFromStr>")]
117    #[serde(rename = "nexmark.hot.channel.ratio", default = "none")]
118    pub hot_channel_ratio: Option<usize>,
119
120    #[serde_as(as = "Option<DisplayFromStr>")]
121    #[serde(rename = "nexmark.first.event.id", default = "none")]
122    pub first_event_id: Option<usize>,
123
124    #[serde_as(as = "Option<DisplayFromStr>")]
125    #[serde(rename = "nexmark.first.event.number", default = "none")]
126    pub first_event_number: Option<usize>,
127
128    #[serde_as(as = "Option<DisplayFromStr>")]
129    #[serde(rename = "nexmark.num.categories", default = "none")]
130    pub num_categories: Option<usize>,
131
132    #[serde_as(as = "Option<DisplayFromStr>")]
133    #[serde(rename = "nexmark.auction.id.lead", default = "none")]
134    pub auction_id_lead: Option<usize>,
135
136    #[serde_as(as = "Option<DisplayFromStr>")]
137    #[serde(rename = "nexmark.hot.seller.ratio.2", default = "none")]
138    pub hot_seller_ratio_2: Option<usize>,
139
140    #[serde_as(as = "Option<DisplayFromStr>")]
141    #[serde(rename = "nexmark.hot.auction.ratio.2", default = "none")]
142    pub hot_auction_ratio_2: Option<usize>,
143
144    #[serde_as(as = "Option<DisplayFromStr>")]
145    #[serde(rename = "nexmark.hot.bidder.ratio.2", default = "none")]
146    pub hot_bidder_ratio_2: Option<usize>,
147
148    #[serde_as(as = "Option<DisplayFromStr>")]
149    #[serde(rename = "nexmark.person.proportion", default = "none")]
150    pub person_proportion: Option<usize>,
151
152    #[serde_as(as = "Option<DisplayFromStr>")]
153    #[serde(rename = "nexmark.auction.proportion", default = "none")]
154    pub auction_proportion: Option<usize>,
155
156    #[serde_as(as = "Option<DisplayFromStr>")]
157    #[serde(rename = "nexmark.bid.proportion", default = "none")]
158    pub bid_proportion: Option<usize>,
159
160    #[serde_as(as = "Option<DisplayFromStr>")]
161    #[serde(rename = "nexmark.first.auction.id", default = "none")]
162    pub first_auction_id: Option<usize>,
163
164    #[serde_as(as = "Option<DisplayFromStr>")]
165    #[serde(rename = "nexmark.first.person.id", default = "none")]
166    pub first_person_id: Option<usize>,
167
168    #[serde_as(as = "Option<DisplayFromStr>")]
169    #[serde(rename = "nexmark.first.category.id", default = "none")]
170    pub first_category_id: Option<usize>,
171
172    #[serde_as(as = "Option<DisplayFromStr>")]
173    #[serde(rename = "nexmark.person.id.lead", default = "none")]
174    pub person_id_lead: Option<usize>,
175
176    #[serde_as(as = "Option<DisplayFromStr>")]
177    #[serde(rename = "nexmark.sine.approx.steps", default = "none")]
178    pub sine_approx_steps: Option<usize>,
179
180    #[serde_as(as = "Option<DisplayFromStr>")]
181    #[serde(rename = "nexmark.base.time", default = "none")]
182    pub base_time: Option<u64>,
183
184    #[serde(rename = "nexmark.us.states")]
185    pub us_states: Option<String>,
186
187    #[serde(rename = "nexmark.us.cities")]
188    pub us_cities: Option<String>,
189
190    #[serde(rename = "nexmark.first.names")]
191    pub first_names: Option<String>,
192
193    #[serde(rename = "nexmark.last.names")]
194    pub last_names: Option<String>,
195
196    #[serde(rename = "nexmark.rate.shape")]
197    pub rate_shape: Option<RateShape>,
198
199    #[serde_as(as = "Option<DisplayFromStr>")]
200    #[serde(rename = "nexmark.rate.period", default = "none")]
201    pub rate_period: Option<usize>,
202
203    #[serde_as(as = "Option<DisplayFromStr>")]
204    #[serde(rename = "nexmark.first.event.rate", default = "none")]
205    pub first_event_rate: Option<usize>,
206
207    #[serde_as(as = "Option<DisplayFromStr>")]
208    #[serde(rename = "nexmark.events.per.sec", default = "none")]
209    pub events_per_sec: Option<usize>,
210
211    #[serde_as(as = "Option<DisplayFromStr>")]
212    #[serde(rename = "nexmark.next.event.rate", default = "none")]
213    pub next_event_rate: Option<usize>,
214
215    #[serde_as(as = "Option<DisplayFromStr>")]
216    #[serde(rename = "nexmark.us.per.unit", default = "none")]
217    pub us_per_unit: Option<usize>,
218
219    #[serde_as(as = "Option<DisplayFromStr>")]
220    #[serde(rename = "nexmark.threads", default = "none")]
221    pub threads: Option<usize>,
222
223    #[serde(flatten)]
224    pub unknown_fields: HashMap<String, String>,
225}
226
227impl SourceProperties for NexmarkProperties {
228    type Split = NexmarkSplit;
229    type SplitEnumerator = NexmarkSplitEnumerator;
230    type SplitReader = NexmarkSplitReader;
231
232    const SOURCE_NAME: &'static str = NEXMARK_CONNECTOR;
233}
234
235impl crate::source::UnknownFields for NexmarkProperties {
236    fn unknown_fields(&self) -> HashMap<String, String> {
237        self.unknown_fields.clone()
238    }
239}
240
241fn default_event_num() -> u64 {
242    u64::MAX
243}
244
245impl Default for NexmarkProperties {
246    fn default() -> Self {
247        let v = serde_json::to_value(HashMap::<String, String>::new()).unwrap();
248        NexmarkProperties::deserialize(v).unwrap()
249    }
250}
251
252impl From<&NexmarkProperties> for NexmarkConfig {
253    fn from(value: &NexmarkProperties) -> Self {
254        // 2015-07-15 00:00:00
255        pub const BASE_TIME: u64 = 1_436_918_400_000;
256
257        let mut cfg = match value.table_type {
258            // This is the old way
259            Some(_) => NexmarkConfig {
260                base_time: BASE_TIME,
261                ..Default::default()
262            },
263            // By using default, it will choose the default proportion of three different events.
264            None => NexmarkConfig::default(),
265        };
266
267        macro_rules! set {
268            ($name:ident) => {
269                set!($name, $name);
270            };
271            ($cfg_name:ident, $prop_name:ident) => {
272                if let Some(v) = value.$prop_name {
273                    cfg.$cfg_name = v;
274                }
275            };
276            ($name:ident @ $map:ident) => {
277                if let Some(v) = &value.$name {
278                    cfg.$name = $map(v);
279                }
280            };
281        }
282        set!(active_people);
283        set!(in_flight_auctions);
284        set!(out_of_order_group_size);
285        set!(avg_person_byte_size);
286        set!(avg_auction_byte_size);
287        set!(avg_bid_byte_size);
288        set!(hot_seller_ratio);
289        set!(hot_auction_ratio);
290        set!(hot_bidder_ratio);
291        set!(hot_channel_ratio);
292        set!(first_event_id);
293        set!(first_event_number);
294        set!(base_time);
295        set!(num_categories);
296        set!(auction_id_lead);
297        set!(person_proportion);
298        set!(auction_proportion);
299        set!(bid_proportion);
300        set!(first_auction_id);
301        set!(first_person_id);
302        set!(first_category_id);
303        set!(person_id_lead);
304        set!(sine_approx_steps);
305        set!(us_states @ split_str);
306        set!(us_cities @ split_str);
307        set!(first_names @ split_str);
308        set!(last_names @ split_str);
309        set!(num_event_generators, threads);
310        set!(rate_shape);
311        set!(rate_period);
312        set!(first_rate, first_event_rate);
313        set!(next_rate, first_event_rate);
314        set!(us_per_unit);
315        cfg
316    }
317}
318
319fn split_str(string: &str) -> Vec<String> {
320    string.split(',').map(String::from).collect()
321}