risingwave_simulation/
nexmark.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Write;
16use std::ops::{Deref, DerefMut};
17use std::time::Duration;
18
19use anyhow::Result;
20
21use crate::cluster::{Cluster, Configuration};
22
23/// The target number of events of the three sources per second totally.
24pub const THROUGHPUT: usize = 5_000;
25
26/// Cluster for nexmark tests.
27pub struct NexmarkCluster {
28    pub cluster: Cluster,
29}
30
31impl NexmarkCluster {
32    /// Create a cluster with nexmark sources created.
33    ///
34    /// If `event_num` is specified, the sources should finish in `event_num / NEXMARK_THROUGHPUT`
35    /// seconds.
36    pub async fn new(
37        conf: Configuration,
38        split_num: usize,
39        event_num: Option<usize>,
40        watermark: bool,
41    ) -> Result<Self> {
42        let mut cluster = Self {
43            cluster: Cluster::start(conf).await?,
44        };
45        cluster
46            .create_nexmark_source(split_num, event_num, watermark)
47            .await?;
48        Ok(cluster)
49    }
50
51    /// Run statements to create the nexmark sources.
52    async fn create_nexmark_source(
53        &mut self,
54        split_num: usize,
55        event_num: Option<usize>,
56        watermark: bool,
57    ) -> Result<()> {
58        let watermark_column = if watermark {
59            ", WATERMARK FOR date_time AS date_time - INTERVAL '4' SECOND"
60        } else {
61            ""
62        };
63
64        let extra_args = {
65            let mut output = String::new();
66            write!(
67                output,
68                ", nexmark.min.event.gap.in.ns = '{}'",
69                Duration::from_secs(1).as_nanos() / THROUGHPUT as u128
70            )?;
71            write!(output, ", nexmark.split.num = '{split_num}'")?;
72            if let Some(event_num) = event_num {
73                write!(output, ", nexmark.event.num = '{event_num}'")?;
74            }
75            write!(output, ", nexmark.max.chunk.size = 256")?;
76            output
77        };
78
79        self.run(format!(
80            include_str!("nexmark/create_source.sql"),
81            watermark_column = watermark_column,
82            extra_args = extra_args
83        ))
84        .await?;
85
86        Ok(())
87    }
88}
89
90impl Deref for NexmarkCluster {
91    type Target = Cluster;
92
93    fn deref(&self) -> &Self::Target {
94        &self.cluster
95    }
96}
97
98impl DerefMut for NexmarkCluster {
99    fn deref_mut(&mut self) -> &mut Self::Target {
100        &mut self.cluster
101    }
102}
103
104/// Nexmark queries.
105pub mod queries {
106    use std::time::Duration;
107
108    const DEFAULT_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
109    const DEFAULT_INITIAL_TIMEOUT: Duration = Duration::from_secs(20);
110
111    pub mod q3 {
112        use super::*;
113        pub const CREATE: &str = include_str!("nexmark/q3.sql");
114        pub const SELECT: &str = "SELECT * FROM nexmark_q3 ORDER BY id;";
115        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q3;";
116        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
117        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
118    }
119
120    pub mod q4 {
121        use super::*;
122        pub const CREATE: &str = include_str!("nexmark/q4.sql");
123        pub const SELECT: &str = "SELECT * FROM nexmark_q4 ORDER BY category;";
124        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q4;";
125        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
126        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
127    }
128
129    pub mod q5 {
130        use super::*;
131        pub const CREATE: &str = include_str!("nexmark/q5.sql");
132        pub const SELECT: &str = "SELECT * FROM nexmark_q5 ORDER BY auction;";
133        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q5;";
134        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
135        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
136    }
137
138    pub mod q7 {
139        use super::*;
140        pub const CREATE: &str = include_str!("nexmark/q7.sql");
141        pub const SELECT: &str = "SELECT * FROM nexmark_q7 ORDER BY date_time;";
142        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q7;";
143        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
144        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
145    }
146
147    pub mod q8 {
148        use super::*;
149        pub const CREATE: &str = include_str!("nexmark/q8.sql");
150        pub const SELECT: &str = "SELECT * FROM nexmark_q8 ORDER BY id;";
151        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q8;";
152        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
153        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
154    }
155
156    pub mod q9 {
157        use super::*;
158        pub const CREATE: &str = include_str!("nexmark/q9.sql");
159        pub const SELECT: &str = "SELECT * FROM nexmark_q9 ORDER BY id;";
160        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q9;";
161        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
162        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
163    }
164
165    pub mod q15 {
166        use super::*;
167        pub const CREATE: &str = include_str!("nexmark/q15.sql");
168        pub const SELECT: &str = "SELECT * FROM nexmark_q15 ORDER BY day ASC, total_bids DESC;";
169        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q15;";
170        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
171        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
172    }
173
174    pub mod q18 {
175        use super::*;
176        pub const CREATE: &str = include_str!("nexmark/q18.sql");
177        pub const SELECT: &str = "SELECT * FROM nexmark_q18 ORDER BY auction, bidder, price DESC;";
178        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q18;";
179        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
180        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
181    }
182
183    pub mod q101 {
184        use super::*;
185        pub const CREATE: &str = include_str!("nexmark/q101.sql");
186        pub const SELECT: &str = "SELECT * FROM nexmark_q101 ORDER BY auction_id;";
187        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q101;";
188        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
189        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
190    }
191
192    pub mod q102 {
193        use super::*;
194        pub const CREATE: &str = include_str!("nexmark/q102.sql");
195        pub const SELECT: &str = "SELECT * FROM nexmark_q102 ORDER BY auction_id;";
196        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q102;";
197        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
198        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
199    }
200
201    pub mod q103 {
202        use super::*;
203        pub const CREATE: &str = include_str!("nexmark/q103.sql");
204        pub const SELECT: &str = "SELECT * FROM nexmark_q103 ORDER BY auction_id;";
205        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q103;";
206        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
207        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
208    }
209
210    pub mod q104 {
211        use super::*;
212        pub const CREATE: &str = include_str!("nexmark/q104.sql");
213        pub const SELECT: &str = "SELECT * FROM nexmark_q104 ORDER BY auction_id;";
214        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q104;";
215        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
216        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
217    }
218
219    pub mod q105 {
220        use super::*;
221        pub const CREATE: &str = include_str!("nexmark/q105.sql");
222        pub const SELECT: &str = "SELECT * FROM nexmark_q105 ORDER BY 1,2,3;";
223        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q105;";
224        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
225        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
226    }
227
228    pub mod q106 {
229        use super::*;
230        pub const CREATE: &str = include_str!("nexmark/q106.sql");
231        pub const SELECT: &str = "SELECT * FROM nexmark_q106;";
232        pub const DROP: &str = "DROP MATERIALIZED VIEW nexmark_q106;";
233        pub const INITIAL_INTERVAL: Duration = DEFAULT_INITIAL_INTERVAL;
234        pub const INITIAL_TIMEOUT: Duration = DEFAULT_INITIAL_TIMEOUT;
235    }
236}