risingwave_sqlsmith/test_runners/
fuzzing.rs1use risingwave_sqlparser::ast::Statement;
18use tokio_postgres::Client;
19
20use crate::config::Configuration;
21use crate::test_runners::utils::{
22 create_base_tables, create_mviews, drop_mview_table, drop_tables, format_drop_mview,
23 generate_rng, populate_tables, run_query, set_variable, test_batch_queries,
24 test_session_variable, test_sqlsmith, test_stream_queries, update_base_tables,
25};
26use crate::utils::read_file_contents;
27use crate::{mview_sql_gen, parse_sql, sql_gen};
28
29pub async fn run_pre_generated(client: &Client, outdir: &str) {
31 let timeout_duration = 12; let queries_path = format!("{}/queries.sql", outdir);
33 let queries = read_file_contents(queries_path).unwrap();
34 for statement in parse_sql(&queries) {
35 let sql = statement.to_string();
36 tracing::info!("[EXECUTING STATEMENT]: {}", sql);
37 run_query(timeout_duration, client, &sql).await.unwrap();
38 }
39 tracing::info!("[EXECUTION SUCCESS]");
40}
41
42pub async fn generate(
49 client: &Client,
50 testdata: &str,
51 count: usize,
52 _outdir: &str,
53 config: &Configuration,
54 seed: Option<u64>,
55) {
56 let timeout_duration = 5;
57
58 set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
59 set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
60 tracing::info!("Set session variables");
61
62 let mut rng = generate_rng(seed);
63 let base_tables = create_base_tables(testdata, client).await.unwrap();
64
65 let rows_per_table = 50;
66 let max_rows_inserted = rows_per_table * base_tables.len();
67 let inserts = populate_tables(
68 client,
69 &mut rng,
70 base_tables.clone(),
71 rows_per_table,
72 config,
73 )
74 .await;
75 tracing::info!("Populated base tables");
76
77 let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client, config)
78 .await
79 .unwrap();
80
81 update_base_tables(client, &mut rng, &base_tables, &inserts, config).await;
83
84 test_sqlsmith(
85 client,
86 &mut rng,
87 tables.clone(),
88 base_tables.clone(),
89 max_rows_inserted,
90 config,
91 )
92 .await;
93 tracing::info!("Passed sqlsmith tests");
94
95 tracing::info!("Ran updates");
96
97 let mut generated_queries = 0;
98 for _ in 0..count {
99 test_session_variable(client, &mut rng).await;
100 let sql = sql_gen(&mut rng, tables.clone(), config);
101 tracing::info!("[EXECUTING TEST_BATCH]: {}", sql);
102 let result = run_query(timeout_duration, client, sql.as_str()).await;
103 match result {
104 Err(_e) => {
105 generated_queries += 1;
106 tracing::info!("Generated {} batch queries", generated_queries);
107 tracing::error!("Unrecoverable error encountered.");
108 return;
109 }
110 Ok(0) => {
111 generated_queries += 1;
112 }
113 _ => {}
114 }
115 }
116 tracing::info!("Generated {} batch queries", generated_queries);
117
118 let mut generated_queries = 0;
119 for _ in 0..count {
120 test_session_variable(client, &mut rng).await;
121 let (sql, table) = mview_sql_gen(&mut rng, tables.clone(), "stream_query", config);
122 tracing::info!("[EXECUTING TEST_STREAM]: {}", sql);
123 let result = run_query(timeout_duration, client, sql.as_str()).await;
124 match result {
125 Err(_e) => {
126 generated_queries += 1;
127 tracing::info!("Generated {} stream queries", generated_queries);
128 tracing::error!("Unrecoverable error encountered.");
129 return;
130 }
131 Ok(0) => {
132 generated_queries += 1;
133 }
134 _ => {}
135 }
136 tracing::info!("[EXECUTING DROP MVIEW]: {}", &format_drop_mview(&table));
137 drop_mview_table(&table, client).await;
138 }
139 tracing::info!("Generated {} stream queries", generated_queries);
140
141 drop_tables(&mviews, testdata, client).await;
142}
143
144pub async fn run(
146 client: &Client,
147 testdata: &str,
148 count: usize,
149 config: &Configuration,
150 seed: Option<u64>,
151) {
152 let mut rng = generate_rng(seed);
153
154 set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
155 set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
156 tracing::info!("Set session variables");
157
158 let base_tables = create_base_tables(testdata, client).await.unwrap();
159
160 let rows_per_table = 50;
161 let inserts = populate_tables(
162 client,
163 &mut rng,
164 base_tables.clone(),
165 rows_per_table,
166 config,
167 )
168 .await;
169 tracing::info!("Populated base tables");
170
171 let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client, config)
172 .await
173 .unwrap();
174 tracing::info!("Created tables");
175
176 let updatable_base_tables: Vec<_> = base_tables
178 .iter()
179 .filter(|table| !table.is_append_only)
180 .cloned()
181 .collect();
182
183 let updatable_inserts: Vec<_> = inserts
185 .iter()
186 .filter(|stmt| {
187 if let Statement::Insert { table_name, .. } = stmt {
188 updatable_base_tables
189 .iter()
190 .any(|table| table.name == table_name.base_name())
191 } else {
192 false
193 }
194 })
195 .cloned()
196 .collect();
197
198 update_base_tables(
200 client,
201 &mut rng,
202 &updatable_base_tables,
203 &updatable_inserts,
204 config,
205 )
206 .await;
207 tracing::info!("Ran updates");
208
209 let max_rows_inserted = rows_per_table * base_tables.len();
210 test_sqlsmith(
211 client,
212 &mut rng,
213 tables.clone(),
214 base_tables.clone(),
215 max_rows_inserted,
216 config,
217 )
218 .await;
219 tracing::info!("Passed sqlsmith tests");
220
221 test_batch_queries(client, &mut rng, tables.clone(), count, config)
222 .await
223 .unwrap();
224 tracing::info!("Passed batch queries");
225 test_stream_queries(client, &mut rng, tables.clone(), count, config)
226 .await
227 .unwrap();
228 tracing::info!("Passed stream queries");
229
230 drop_tables(&mviews, testdata, client).await;
231 tracing::info!("[EXECUTION SUCCESS]");
232}