risingwave_simulation/
nexmark.rs1use std::fmt::Write;
16use std::ops::{Deref, DerefMut};
17use std::time::Duration;
18
19use anyhow::Result;
20
21use crate::cluster::{Cluster, Configuration};
22
23pub const THROUGHPUT: usize = 5_000;
25
26pub struct NexmarkCluster {
28 pub cluster: Cluster,
29}
30
31impl NexmarkCluster {
32 pub async fn new(
39 conf: Configuration,
40 split_num: usize,
41 event_num: Option<usize>,
42 watermark: bool,
43 ) -> Result<Self> {
44 let mut cluster = Self {
45 cluster: Cluster::start(conf).await?,
46 };
47 cluster
48 .create_nexmark_source(split_num, event_num, watermark)
49 .await?;
50 Ok(cluster)
51 }
52
53 async fn create_nexmark_source(
55 &mut self,
56 split_num: usize,
57 event_num: Option<usize>,
58 watermark: bool,
59 ) -> Result<()> {
60 let watermark_column = if watermark {
61 ", WATERMARK FOR date_time AS date_time - INTERVAL '4' SECOND"
62 } else {
63 ""
64 };
65
66 let extra_args = {
67 let mut output = String::new();
68 write!(
69 output,
70 ", nexmark.min.event.gap.in.ns = '{}'",
71 Duration::from_secs(1).as_nanos() / THROUGHPUT as u128
72 )?;
73 write!(output, ", nexmark.split.num = '{split_num}'")?;
74 if let Some(event_num) = event_num {
75 write!(output, ", nexmark.event.num = '{event_num}'")?;
76 }
77 write!(output, ", nexmark.max.chunk.size = 256")?;
78 output
79 };
80
81 self.run(format!(
82 include_str!("nexmark/create_source.sql"),
83 watermark_column = watermark_column,
84 extra_args = extra_args
85 ))
86 .await?;
87
88 Ok(())
89 }
90}
91
92impl Deref for NexmarkCluster {
93 type Target = Cluster;
94
95 fn deref(&self) -> &Self::Target {
96 &self.cluster
97 }
98}
99
100impl DerefMut for NexmarkCluster {
101 fn deref_mut(&mut self) -> &mut Self::Target {
102 &mut self.cluster
103 }
104}
105
106pub mod queries {
108 use std::time::Duration;
109
110 const DEFAULT_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
111 const DEFAULT_INITIAL_TIMEOUT: Duration = Duration::from_secs(20);
112 const DEFAULT_INITIAL_TIMEOUT_EOWC: Duration = Duration::from_secs(60);
113
114 pub mod q3 {
115 use super::*;
116 pub const CREATE: &str = include_str!("nexmark/q3.sql");
117 pub const SELECT: &str = "SELECT * FROM nexmark_q3 ORDER BY id;";
118 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q3;";
119 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
120 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
121 pub const WATERMARK: bool = false;
122 }
123
124 pub mod q4 {
125 use super::*;
126 pub const CREATE: &str = include_str!("nexmark/q4.sql");
127 pub const SELECT: &str = "SELECT * FROM nexmark_q4 ORDER BY category;";
128 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q4;";
129 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
130 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
131 pub const WATERMARK: bool = false;
132 }
133
134 pub mod q5 {
135 use super::*;
136 pub const CREATE: &str = include_str!("nexmark/q5.sql");
137 pub const SELECT: &str = "SELECT * FROM nexmark_q5 ORDER BY auction;";
138 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q5;";
139 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
140 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
141 pub const WATERMARK: bool = false;
142 }
143
144 pub mod q5_eowc {
145 use super::*;
146 pub const CREATE: &str = include_str!("nexmark/q5_eowc.sql");
147 pub const SELECT: &str = "SELECT * FROM nexmark_q5_eowc ORDER BY auction LIMIT 1000;";
148 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q5_eowc;";
149 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
150 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT_EOWC;
151 pub const WATERMARK: bool = true;
152 }
153
154 pub mod q7 {
155 use super::*;
156 pub const CREATE: &str = include_str!("nexmark/q7.sql");
157 pub const SELECT: &str = "SELECT * FROM nexmark_q7 ORDER BY date_time;";
158 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q7;";
159 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
160 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
161 pub const WATERMARK: bool = false;
162 }
163
164 pub mod q7_eowc {
165 use super::*;
166 pub const CREATE: &str = include_str!("nexmark/q7_eowc.sql");
167 pub const SELECT: &str = "SELECT * FROM nexmark_q7_eowc ORDER BY date_time;";
168 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q7_eowc;";
169 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
170 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT_EOWC;
171 pub const WATERMARK: bool = true;
172 }
173
174 pub mod q8 {
175 use super::*;
176 pub const CREATE: &str = include_str!("nexmark/q8.sql");
177 pub const SELECT: &str = "SELECT * FROM nexmark_q8 ORDER BY id;";
178 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q8;";
179 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
180 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
181 pub const WATERMARK: bool = false;
182 }
183
184 pub mod q9 {
185 use super::*;
186 pub const CREATE: &str = include_str!("nexmark/q9.sql");
187 pub const SELECT: &str = "SELECT * FROM nexmark_q9 ORDER BY id;";
188 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q9;";
189 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
190 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
191 pub const WATERMARK: bool = false;
192 }
193
194 pub mod q15 {
195 use super::*;
196 pub const CREATE: &str = include_str!("nexmark/q15.sql");
197 pub const SELECT: &str = "SELECT * FROM nexmark_q15 ORDER BY day ASC, total_bids DESC;";
198 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q15;";
199 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
200 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
201 pub const WATERMARK: bool = false;
202 }
203
204 pub mod q18 {
205 use super::*;
206 pub const CREATE: &str = include_str!("nexmark/q18.sql");
207 pub const SELECT: &str = "SELECT * FROM nexmark_q18 ORDER BY auction, bidder, price DESC;";
208 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q18;";
209 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
210 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
211 pub const WATERMARK: bool = false;
212 }
213
214 pub mod q101 {
215 use super::*;
216 pub const CREATE: &str = include_str!("nexmark/q101.sql");
217 pub const SELECT: &str = "SELECT * FROM nexmark_q101 ORDER BY auction_id;";
218 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q101;";
219 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
220 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
221 pub const WATERMARK: bool = false;
222 }
223
224 pub mod q102 {
225 use super::*;
226 pub const CREATE: &str = include_str!("nexmark/q102.sql");
227 pub const SELECT: &str = "SELECT * FROM nexmark_q102 ORDER BY auction_id;";
228 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q102;";
229 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
230 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
231 pub const WATERMARK: bool = false;
232 }
233
234 pub mod q103 {
235 use super::*;
236 pub const CREATE: &str = include_str!("nexmark/q103.sql");
237 pub const SELECT: &str = "SELECT * FROM nexmark_q103 ORDER BY auction_id;";
238 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q103;";
239 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
240 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
241 pub const WATERMARK: bool = false;
242 }
243
244 pub mod q104 {
245 use super::*;
246 pub const CREATE: &str = include_str!("nexmark/q104.sql");
247 pub const SELECT: &str = "SELECT * FROM nexmark_q104 ORDER BY auction_id;";
248 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q104;";
249 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
250 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
251 pub const WATERMARK: bool = false;
252 }
253
254 pub mod q105 {
255 use super::*;
256 pub const CREATE: &str = include_str!("nexmark/q105.sql");
257 pub const SELECT: &str = "SELECT * FROM nexmark_q105;";
258 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q105;";
259 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
260 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
261 pub const WATERMARK: bool = false;
262 }
263
264 pub mod q106 {
265 use super::*;
266 pub const CREATE: &str = include_str!("nexmark/q106.sql");
267 pub const SELECT: &str = "SELECT * FROM nexmark_q106;";
268 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q106;";
269 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
270 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
271 pub const WATERMARK: bool = false;
272 }
273
274 pub mod q107_eowc {
275 use super::*;
276 pub const CREATE: &str = include_str!("nexmark/q107_eowc.sql");
277 pub const SELECT: &str = "SELECT * FROM nexmark_q107_eowc ORDER BY date_time LIMIT 1000;";
278 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q107_eowc;";
279 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
280 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT_EOWC;
281 pub const WATERMARK: bool = true;
282 }
283}