risingwave_sqlsmith/test_runners/
fuzzing.rs1use tokio_postgres::Client;
18
19use crate::test_runners::utils::{
20 create_base_tables, create_mviews, drop_mview_table, drop_tables, format_drop_mview,
21 generate_rng, populate_tables, run_query, set_variable, test_batch_queries,
22 test_session_variable, test_sqlsmith, test_stream_queries, update_base_tables,
23};
24use crate::utils::read_file_contents;
25use crate::{mview_sql_gen, parse_sql, sql_gen};
26
27pub async fn run_pre_generated(client: &Client, outdir: &str) {
29 let timeout_duration = 12; let queries_path = format!("{}/queries.sql", outdir);
31 let queries = read_file_contents(queries_path).unwrap();
32 for statement in parse_sql(&queries) {
33 let sql = statement.to_string();
34 tracing::info!("[EXECUTING STATEMENT]: {}", sql);
35 run_query(timeout_duration, client, &sql).await.unwrap();
36 }
37 tracing::info!("[EXECUTION SUCCESS]");
38}
39
40pub async fn generate(
47 client: &Client,
48 testdata: &str,
49 count: usize,
50 _outdir: &str,
51 seed: Option<u64>,
52) {
53 let timeout_duration = 5;
54
55 set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
56 set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
57 tracing::info!("Set session variables");
58
59 let mut rng = generate_rng(seed);
60 let base_tables = create_base_tables(testdata, client).await.unwrap();
61
62 let rows_per_table = 50;
63 let max_rows_inserted = rows_per_table * base_tables.len();
64 let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
65 tracing::info!("Populated base tables");
66
67 let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
68 .await
69 .unwrap();
70
71 update_base_tables(client, &mut rng, &base_tables, &inserts).await;
73
74 test_sqlsmith(
75 client,
76 &mut rng,
77 tables.clone(),
78 base_tables.clone(),
79 max_rows_inserted,
80 )
81 .await;
82 tracing::info!("Passed sqlsmith tests");
83
84 tracing::info!("Ran updates");
85
86 let mut generated_queries = 0;
87 for _ in 0..count {
88 test_session_variable(client, &mut rng).await;
89 let sql = sql_gen(&mut rng, tables.clone());
90 tracing::info!("[EXECUTING TEST_BATCH]: {}", sql);
91 let result = run_query(timeout_duration, client, sql.as_str()).await;
92 match result {
93 Err(_e) => {
94 generated_queries += 1;
95 tracing::info!("Generated {} batch queries", generated_queries);
96 tracing::error!("Unrecoverable error encountered.");
97 return;
98 }
99 Ok(0) => {
100 generated_queries += 1;
101 }
102 _ => {}
103 }
104 }
105 tracing::info!("Generated {} batch queries", generated_queries);
106
107 let mut generated_queries = 0;
108 for _ in 0..count {
109 test_session_variable(client, &mut rng).await;
110 let (sql, table) = mview_sql_gen(&mut rng, tables.clone(), "stream_query");
111 tracing::info!("[EXECUTING TEST_STREAM]: {}", sql);
112 let result = run_query(timeout_duration, client, sql.as_str()).await;
113 match result {
114 Err(_e) => {
115 generated_queries += 1;
116 tracing::info!("Generated {} stream queries", generated_queries);
117 tracing::error!("Unrecoverable error encountered.");
118 return;
119 }
120 Ok(0) => {
121 generated_queries += 1;
122 }
123 _ => {}
124 }
125 tracing::info!("[EXECUTING DROP MVIEW]: {}", &format_drop_mview(&table));
126 drop_mview_table(&table, client).await;
127 }
128 tracing::info!("Generated {} stream queries", generated_queries);
129
130 drop_tables(&mviews, testdata, client).await;
131}
132
133pub async fn run(client: &Client, testdata: &str, count: usize, seed: Option<u64>) {
135 let mut rng = generate_rng(seed);
136
137 set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
138 set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
139 tracing::info!("Set session variables");
140
141 let base_tables = create_base_tables(testdata, client).await.unwrap();
142
143 let rows_per_table = 50;
144 let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
145 tracing::info!("Populated base tables");
146
147 let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
148 .await
149 .unwrap();
150 tracing::info!("Created tables");
151
152 update_base_tables(client, &mut rng, &base_tables, &inserts).await;
154 tracing::info!("Ran updates");
155
156 let max_rows_inserted = rows_per_table * base_tables.len();
157 test_sqlsmith(
158 client,
159 &mut rng,
160 tables.clone(),
161 base_tables.clone(),
162 max_rows_inserted,
163 )
164 .await;
165 tracing::info!("Passed sqlsmith tests");
166
167 test_batch_queries(client, &mut rng, tables.clone(), count)
168 .await
169 .unwrap();
170 tracing::info!("Passed batch queries");
171 test_stream_queries(client, &mut rng, tables.clone(), count)
172 .await
173 .unwrap();
174 tracing::info!("Passed stream queries");
175
176 drop_tables(&mviews, testdata, client).await;
177 tracing::info!("[EXECUTION SUCCESS]");
178}