risingwave_sqlsmith/test_runners/
fuzzing.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Provides E2E Test runner functionality.
16
17use risingwave_sqlparser::ast::Statement;
18use tokio_postgres::Client;
19
20use crate::config::Configuration;
21use crate::test_runners::utils::{
22    create_base_tables, create_mviews, drop_mview_table, drop_tables, format_drop_mview,
23    generate_rng, populate_tables, run_query, set_variable, test_batch_queries,
24    test_session_variable, test_sqlsmith, test_stream_queries, update_base_tables,
25};
26use crate::utils::read_file_contents;
27use crate::{mview_sql_gen, parse_sql, sql_gen};
28
29/// e2e test runner for pre-generated queries from sqlsmith
30pub async fn run_pre_generated(client: &Client, outdir: &str) {
31    let timeout_duration = 12; // allow for some variance.
32    let queries_path = format!("{}/queries.sql", outdir);
33    let queries = read_file_contents(queries_path).unwrap();
34    for statement in parse_sql(&queries) {
35        let sql = statement.to_string();
36        tracing::info!("[EXECUTING STATEMENT]: {}", sql);
37        run_query(timeout_duration, client, &sql).await.unwrap();
38    }
39    tracing::info!("[EXECUTION SUCCESS]");
40}
41
42/// Query Generator
43/// If we encounter an expected error, just skip.
44/// If we encounter an unexpected error,
45/// Sqlsmith should stop execution, but writeout ddl and queries so far.
46/// If query takes too long -> cancel it, **mark it as error**.
47/// NOTE(noel): It will still fail if DDL creation fails.
48pub async fn generate(
49    client: &Client,
50    testdata: &str,
51    count: usize,
52    _outdir: &str,
53    config: &Configuration,
54    seed: Option<u64>,
55) {
56    let timeout_duration = 5;
57
58    set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
59    set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
60    tracing::info!("Set session variables");
61
62    let mut rng = generate_rng(seed);
63    let base_tables = create_base_tables(testdata, client).await.unwrap();
64
65    let rows_per_table = 50;
66    let max_rows_inserted = rows_per_table * base_tables.len();
67    let inserts = populate_tables(
68        client,
69        &mut rng,
70        base_tables.clone(),
71        rows_per_table,
72        config,
73    )
74    .await;
75    tracing::info!("Populated base tables");
76
77    let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client, config)
78        .await
79        .unwrap();
80
81    // Generate an update for some inserts, on the corresponding table.
82    update_base_tables(client, &mut rng, &base_tables, &inserts, config).await;
83
84    test_sqlsmith(
85        client,
86        &mut rng,
87        tables.clone(),
88        base_tables.clone(),
89        max_rows_inserted,
90        config,
91    )
92    .await;
93    tracing::info!("Passed sqlsmith tests");
94
95    tracing::info!("Ran updates");
96
97    let mut generated_queries = 0;
98    for _ in 0..count {
99        test_session_variable(client, &mut rng).await;
100        let sql = sql_gen(&mut rng, tables.clone(), config);
101        tracing::info!("[EXECUTING TEST_BATCH]: {}", sql);
102        let result = run_query(timeout_duration, client, sql.as_str()).await;
103        match result {
104            Err(_e) => {
105                generated_queries += 1;
106                tracing::info!("Generated {} batch queries", generated_queries);
107                tracing::error!("Unrecoverable error encountered.");
108                return;
109            }
110            Ok(0) => {
111                generated_queries += 1;
112            }
113            _ => {}
114        }
115    }
116    tracing::info!("Generated {} batch queries", generated_queries);
117
118    let mut generated_queries = 0;
119    for _ in 0..count {
120        test_session_variable(client, &mut rng).await;
121        let (sql, table) = mview_sql_gen(&mut rng, tables.clone(), "stream_query", config);
122        tracing::info!("[EXECUTING TEST_STREAM]: {}", sql);
123        let result = run_query(timeout_duration, client, sql.as_str()).await;
124        match result {
125            Err(_e) => {
126                generated_queries += 1;
127                tracing::info!("Generated {} stream queries", generated_queries);
128                tracing::error!("Unrecoverable error encountered.");
129                return;
130            }
131            Ok(0) => {
132                generated_queries += 1;
133            }
134            _ => {}
135        }
136        tracing::info!("[EXECUTING DROP MVIEW]: {}", &format_drop_mview(&table));
137        drop_mview_table(&table, client).await;
138    }
139    tracing::info!("Generated {} stream queries", generated_queries);
140
141    drop_tables(&mviews, testdata, client).await;
142}
143
144/// e2e test runner for sqlsmith
145pub async fn run(
146    client: &Client,
147    testdata: &str,
148    count: usize,
149    config: &Configuration,
150    seed: Option<u64>,
151) {
152    let mut rng = generate_rng(seed);
153
154    set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
155    set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
156    tracing::info!("Set session variables");
157
158    let base_tables = create_base_tables(testdata, client).await.unwrap();
159
160    let rows_per_table = 50;
161    let inserts = populate_tables(
162        client,
163        &mut rng,
164        base_tables.clone(),
165        rows_per_table,
166        config,
167    )
168    .await;
169    tracing::info!("Populated base tables");
170
171    let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client, config)
172        .await
173        .unwrap();
174    tracing::info!("Created tables");
175
176    // Filter out not-append-only tables.
177    let updatable_base_tables: Vec<_> = base_tables
178        .iter()
179        .filter(|table| !table.is_append_only)
180        .cloned()
181        .collect();
182
183    // Filter out inserts on not-append-only tables.
184    let updatable_inserts: Vec<_> = inserts
185        .iter()
186        .filter(|stmt| {
187            if let Statement::Insert { table_name, .. } = stmt {
188                updatable_base_tables
189                    .iter()
190                    .any(|table| table.name == table_name.base_name())
191            } else {
192                false
193            }
194        })
195        .cloned()
196        .collect();
197
198    // Generate an update for some inserts, on the corresponding table.
199    update_base_tables(
200        client,
201        &mut rng,
202        &updatable_base_tables,
203        &updatable_inserts,
204        config,
205    )
206    .await;
207    tracing::info!("Ran updates");
208
209    let max_rows_inserted = rows_per_table * base_tables.len();
210    test_sqlsmith(
211        client,
212        &mut rng,
213        tables.clone(),
214        base_tables.clone(),
215        max_rows_inserted,
216        config,
217    )
218    .await;
219    tracing::info!("Passed sqlsmith tests");
220
221    test_batch_queries(client, &mut rng, tables.clone(), count, config)
222        .await
223        .unwrap();
224    tracing::info!("Passed batch queries");
225    test_stream_queries(client, &mut rng, tables.clone(), count, config)
226        .await
227        .unwrap();
228    tracing::info!("Passed stream queries");
229
230    drop_tables(&mviews, testdata, client).await;
231    tracing::info!("[EXECUTION SUCCESS]");
232}