risingwave_simulation/
nexmark.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Write;
16use std::ops::{Deref, DerefMut};
17use std::time::Duration;
18
19use anyhow::Result;
20
21use crate::cluster::{Cluster, Configuration};
22
23/// The target number of events of the three sources per second totally.
24pub const THROUGHPUT: usize = 5_000;
25
26/// Cluster for nexmark tests.
27pub struct NexmarkCluster {
28    pub cluster: Cluster,
29}
30
31impl NexmarkCluster {
32    /// Create a cluster with nexmark sources created.
33    ///
34    /// If `event_num` is specified, the sources should finish in `event_num / NEXMARK_THROUGHPUT`
35    /// seconds.
36    ///
37    /// If `watermark` is true, there will be a watermark with delay of 4 seconds on `date_time`.
38    pub async fn new(
39        conf: Configuration,
40        split_num: usize,
41        event_num: Option<usize>,
42        watermark: bool,
43    ) -> Result<Self> {
44        let mut cluster = Self {
45            cluster: Cluster::start(conf).await?,
46        };
47        cluster
48            .create_nexmark_source(split_num, event_num, watermark)
49            .await?;
50        Ok(cluster)
51    }
52
53    /// Run statements to create the nexmark sources.
54    async fn create_nexmark_source(
55        &mut self,
56        split_num: usize,
57        event_num: Option<usize>,
58        watermark: bool,
59    ) -> Result<()> {
60        let watermark_column = if watermark {
61            ", WATERMARK FOR date_time AS date_time - INTERVAL '4' SECOND"
62        } else {
63            ""
64        };
65
66        let extra_args = {
67            let mut output = String::new();
68            write!(
69                output,
70                ", nexmark.min.event.gap.in.ns = '{}'",
71                Duration::from_secs(1).as_nanos() / THROUGHPUT as u128
72            )?;
73            write!(output, ", nexmark.split.num = '{split_num}'")?;
74            if let Some(event_num) = event_num {
75                write!(output, ", nexmark.event.num = '{event_num}'")?;
76            }
77            write!(output, ", nexmark.max.chunk.size = 256")?;
78            output
79        };
80
81        self.run(format!(
82            include_str!("nexmark/create_source.sql"),
83            watermark_column = watermark_column,
84            extra_args = extra_args
85        ))
86        .await?;
87
88        Ok(())
89    }
90}
91
92impl Deref for NexmarkCluster {
93    type Target = Cluster;
94
95    fn deref(&self) -> &Self::Target {
96        &self.cluster
97    }
98}
99
100impl DerefMut for NexmarkCluster {
101    fn deref_mut(&mut self) -> &mut Self::Target {
102        &mut self.cluster
103    }
104}
105
106/// Nexmark queries.
107pub mod queries {
108    use std::time::Duration;
109
110    const DEFAULT_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
111    const DEFAULT_INITIAL_TIMEOUT: Duration = Duration::from_secs(20);
112    const DEFAULT_INITIAL_TIMEOUT_EOWC: Duration = Duration::from_secs(60);
113
114    pub mod q3 {
115        use super::*;
116        pub const CREATE: &str = include_str!("nexmark/q3.sql");
117        pub const SELECT: &str = "SELECT * FROM nexmark_q3 ORDER BY id;";
118        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q3;";
119        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
120        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
121        pub const WATERMARK: bool = false;
122    }
123
124    pub mod q4 {
125        use super::*;
126        pub const CREATE: &str = include_str!("nexmark/q4.sql");
127        pub const SELECT: &str = "SELECT * FROM nexmark_q4 ORDER BY category;";
128        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q4;";
129        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
130        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
131        pub const WATERMARK: bool = false;
132    }
133
134    pub mod q5 {
135        use super::*;
136        pub const CREATE: &str = include_str!("nexmark/q5.sql");
137        pub const SELECT: &str = "SELECT * FROM nexmark_q5 ORDER BY auction;";
138        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q5;";
139        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
140        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
141        pub const WATERMARK: bool = false;
142    }
143
144    pub mod q5_eowc {
145        use super::*;
146        pub const CREATE: &str = include_str!("nexmark/q5_eowc.sql");
147        pub const SELECT: &str = "SELECT * FROM nexmark_q5_eowc ORDER BY auction LIMIT 1000;";
148        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q5_eowc;";
149        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
150        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT_EOWC;
151        pub const WATERMARK: bool = true;
152    }
153
154    pub mod q7 {
155        use super::*;
156        pub const CREATE: &str = include_str!("nexmark/q7.sql");
157        pub const SELECT: &str = "SELECT * FROM nexmark_q7 ORDER BY date_time;";
158        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q7;";
159        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
160        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
161        pub const WATERMARK: bool = false;
162    }
163
164    pub mod q7_eowc {
165        use super::*;
166        pub const CREATE: &str = include_str!("nexmark/q7_eowc.sql");
167        pub const SELECT: &str = "SELECT * FROM nexmark_q7_eowc ORDER BY date_time;";
168        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q7_eowc;";
169        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
170        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT_EOWC;
171        pub const WATERMARK: bool = true;
172    }
173
174    pub mod q8 {
175        use super::*;
176        pub const CREATE: &str = include_str!("nexmark/q8.sql");
177        pub const SELECT: &str = "SELECT * FROM nexmark_q8 ORDER BY id;";
178        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q8;";
179        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
180        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
181        pub const WATERMARK: bool = false;
182    }
183
184    pub mod q9 {
185        use super::*;
186        pub const CREATE: &str = include_str!("nexmark/q9.sql");
187        pub const SELECT: &str = "SELECT * FROM nexmark_q9 ORDER BY id;";
188        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q9;";
189        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
190        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
191        pub const WATERMARK: bool = false;
192    }
193
194    pub mod q15 {
195        use super::*;
196        pub const CREATE: &str = include_str!("nexmark/q15.sql");
197        pub const SELECT: &str = "SELECT * FROM nexmark_q15 ORDER BY day ASC, total_bids DESC;";
198        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q15;";
199        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
200        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
201        pub const WATERMARK: bool = false;
202    }
203
204    pub mod q18 {
205        use super::*;
206        pub const CREATE: &str = include_str!("nexmark/q18.sql");
207        pub const SELECT: &str = "SELECT * FROM nexmark_q18 ORDER BY auction, bidder, price DESC;";
208        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q18;";
209        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
210        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
211        pub const WATERMARK: bool = false;
212    }
213
214    pub mod q101 {
215        use super::*;
216        pub const CREATE: &str = include_str!("nexmark/q101.sql");
217        pub const SELECT: &str = "SELECT * FROM nexmark_q101 ORDER BY auction_id;";
218        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q101;";
219        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
220        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
221        pub const WATERMARK: bool = false;
222    }
223
224    pub mod q102 {
225        use super::*;
226        pub const CREATE: &str = include_str!("nexmark/q102.sql");
227        pub const SELECT: &str = "SELECT * FROM nexmark_q102 ORDER BY auction_id;";
228        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q102;";
229        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
230        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
231        pub const WATERMARK: bool = false;
232    }
233
234    pub mod q103 {
235        use super::*;
236        pub const CREATE: &str = include_str!("nexmark/q103.sql");
237        pub const SELECT: &str = "SELECT * FROM nexmark_q103 ORDER BY auction_id;";
238        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q103;";
239        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
240        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
241        pub const WATERMARK: bool = false;
242    }
243
244    pub mod q104 {
245        use super::*;
246        pub const CREATE: &str = include_str!("nexmark/q104.sql");
247        pub const SELECT: &str = "SELECT * FROM nexmark_q104 ORDER BY auction_id;";
248        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q104;";
249        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
250        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
251        pub const WATERMARK: bool = false;
252    }
253
254    pub mod q105 {
255        use super::*;
256        pub const CREATE: &str = include_str!("nexmark/q105.sql");
257        pub const SELECT: &str = "SELECT * FROM nexmark_q105;";
258        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q105;";
259        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
260        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
261        pub const WATERMARK: bool = false;
262    }
263
264    pub mod q106 {
265        use super::*;
266        pub const CREATE: &str = include_str!("nexmark/q106.sql");
267        pub const SELECT: &str = "SELECT * FROM nexmark_q106;";
268        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q106;";
269        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
270        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
271        pub const WATERMARK: bool = false;
272    }
273
274    pub mod q107_eowc {
275        use super::*;
276        pub const CREATE: &str = include_str!("nexmark/q107_eowc.sql");
277        pub const SELECT: &str = "SELECT * FROM nexmark_q107_eowc ORDER BY date_time LIMIT 1000;";
278        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q107_eowc;";
279        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
280        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT_EOWC;
281        pub const WATERMARK: bool = true;
282    }
283}