risingwave_simulation/
nexmark.rs1use std::fmt::Write;
16use std::ops::{Deref, DerefMut};
17use std::time::Duration;
18
19use anyhow::Result;
20
21use crate::cluster::{Cluster, Configuration};
22
23pub const THROUGHPUT: usize = 5_000;
25
26pub struct NexmarkCluster {
28 pub cluster: Cluster,
29}
30
31impl NexmarkCluster {
32 pub async fn new(
37 conf: Configuration,
38 split_num: usize,
39 event_num: Option<usize>,
40 watermark: bool,
41 ) -> Result<Self> {
42 let mut cluster = Self {
43 cluster: Cluster::start(conf).await?,
44 };
45 cluster
46 .create_nexmark_source(split_num, event_num, watermark)
47 .await?;
48 Ok(cluster)
49 }
50
51 async fn create_nexmark_source(
53 &mut self,
54 split_num: usize,
55 event_num: Option<usize>,
56 watermark: bool,
57 ) -> Result<()> {
58 let watermark_column = if watermark {
59 ", WATERMARK FOR date_time AS date_time - INTERVAL '4' SECOND"
60 } else {
61 ""
62 };
63
64 let extra_args = {
65 let mut output = String::new();
66 write!(
67 output,
68 ", nexmark.min.event.gap.in.ns = '{}'",
69 Duration::from_secs(1).as_nanos() / THROUGHPUT as u128
70 )?;
71 write!(output, ", nexmark.split.num = '{split_num}'")?;
72 if let Some(event_num) = event_num {
73 write!(output, ", nexmark.event.num = '{event_num}'")?;
74 }
75 write!(output, ", nexmark.max.chunk.size = 256")?;
76 output
77 };
78
79 self.run(format!(
80 include_str!("nexmark/create_source.sql"),
81 watermark_column = watermark_column,
82 extra_args = extra_args
83 ))
84 .await?;
85
86 Ok(())
87 }
88}
89
90impl Deref for NexmarkCluster {
91 type Target = Cluster;
92
93 fn deref(&self) -> &Self::Target {
94 &self.cluster
95 }
96}
97
98impl DerefMut for NexmarkCluster {
99 fn deref_mut(&mut self) -> &mut Self::Target {
100 &mut self.cluster
101 }
102}
103
104pub mod queries {
106 use std::time::Duration;
107
108 const DEFAULT_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
109 const DEFAULT_INITIAL_TIMEOUT: Duration = Duration::from_secs(20);
110
111 pub mod q3 {
112 use super::*;
113 pub const CREATE: &str = include_str!("nexmark/q3.sql");
114 pub const SELECT: &str = "SELECT * FROM nexmark_q3 ORDER BY id;";
115 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q3;";
116 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
117 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
118 }
119
120 pub mod q4 {
121 use super::*;
122 pub const CREATE: &str = include_str!("nexmark/q4.sql");
123 pub const SELECT: &str = "SELECT * FROM nexmark_q4 ORDER BY category;";
124 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q4;";
125 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
126 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
127 }
128
129 pub mod q5 {
130 use super::*;
131 pub const CREATE: &str = include_str!("nexmark/q5.sql");
132 pub const SELECT: &str = "SELECT * FROM nexmark_q5 ORDER BY auction;";
133 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q5;";
134 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
135 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
136 }
137
138 pub mod q7 {
139 use super::*;
140 pub const CREATE: &str = include_str!("nexmark/q7.sql");
141 pub const SELECT: &str = "SELECT * FROM nexmark_q7 ORDER BY date_time;";
142 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q7;";
143 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
144 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
145 }
146
147 pub mod q8 {
148 use super::*;
149 pub const CREATE: &str = include_str!("nexmark/q8.sql");
150 pub const SELECT: &str = "SELECT * FROM nexmark_q8 ORDER BY id;";
151 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q8;";
152 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
153 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
154 }
155
156 pub mod q9 {
157 use super::*;
158 pub const CREATE: &str = include_str!("nexmark/q9.sql");
159 pub const SELECT: &str = "SELECT * FROM nexmark_q9 ORDER BY id;";
160 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q9;";
161 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
162 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
163 }
164
165 pub mod q15 {
166 use super::*;
167 pub const CREATE: &str = include_str!("nexmark/q15.sql");
168 pub const SELECT: &str = "SELECT * FROM nexmark_q15 ORDER BY day ASC, total_bids DESC;";
169 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q15;";
170 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
171 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
172 }
173
174 pub mod q18 {
175 use super::*;
176 pub const CREATE: &str = include_str!("nexmark/q18.sql");
177 pub const SELECT: &str = "SELECT * FROM nexmark_q18 ORDER BY auction, bidder, price DESC;";
178 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q18;";
179 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
180 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
181 }
182
183 pub mod q101 {
184 use super::*;
185 pub const CREATE: &str = include_str!("nexmark/q101.sql");
186 pub const SELECT: &str = "SELECT * FROM nexmark_q101 ORDER BY auction_id;";
187 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q101;";
188 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
189 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
190 }
191
192 pub mod q102 {
193 use super::*;
194 pub const CREATE: &str = include_str!("nexmark/q102.sql");
195 pub const SELECT: &str = "SELECT * FROM nexmark_q102 ORDER BY auction_id;";
196 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q102;";
197 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
198 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
199 }
200
201 pub mod q103 {
202 use super::*;
203 pub const CREATE: &str = include_str!("nexmark/q103.sql");
204 pub const SELECT: &str = "SELECT * FROM nexmark_q103 ORDER BY auction_id;";
205 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q103;";
206 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
207 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
208 }
209
210 pub mod q104 {
211 use super::*;
212 pub const CREATE: &str = include_str!("nexmark/q104.sql");
213 pub const SELECT: &str = "SELECT * FROM nexmark_q104 ORDER BY auction_id;";
214 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q104;";
215 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
216 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
217 }
218
219 pub mod q105 {
220 use super::*;
221 pub const CREATE: &str = include_str!("nexmark/q105.sql");
222 pub const SELECT: &str = "SELECT * FROM nexmark_q105 ORDER BY 1,2,3;";
223 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q105;";
224 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
225 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
226 }
227
228 pub mod q106 {
229 use super::*;
230 pub const CREATE: &str = include_str!("nexmark/q106.sql");
231 pub const SELECT: &str = "SELECT * FROM nexmark_q106;";
232 pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q106;";
233 pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
234 pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
235 }
236}