risingwave_sqlsmith/test_runners/
fuzzing.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Provides E2E Test runner functionality.
16
17use tokio_postgres::Client;
18
19use crate::test_runners::utils::{
20    create_base_tables, create_mviews, drop_mview_table, drop_tables, format_drop_mview,
21    generate_rng, populate_tables, run_query, set_variable, test_batch_queries,
22    test_session_variable, test_sqlsmith, test_stream_queries, update_base_tables,
23};
24use crate::utils::read_file_contents;
25use crate::{mview_sql_gen, parse_sql, sql_gen};
26
27/// e2e test runner for pre-generated queries from sqlsmith
28pub async fn run_pre_generated(client: &Client, outdir: &str) {
29    let timeout_duration = 12; // allow for some variance.
30    let queries_path = format!("{}/queries.sql", outdir);
31    let queries = read_file_contents(queries_path).unwrap();
32    for statement in parse_sql(&queries) {
33        let sql = statement.to_string();
34        tracing::info!("[EXECUTING STATEMENT]: {}", sql);
35        run_query(timeout_duration, client, &sql).await.unwrap();
36    }
37    tracing::info!("[EXECUTION SUCCESS]");
38}
39
40/// Query Generator
41/// If we encounter an expected error, just skip.
42/// If we encounter an unexpected error,
43/// Sqlsmith should stop execution, but writeout ddl and queries so far.
44/// If query takes too long -> cancel it, **mark it as error**.
45/// NOTE(noel): It will still fail if DDL creation fails.
46pub async fn generate(
47    client: &Client,
48    testdata: &str,
49    count: usize,
50    _outdir: &str,
51    seed: Option<u64>,
52) {
53    let timeout_duration = 5;
54
55    set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
56    set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
57    tracing::info!("Set session variables");
58
59    let mut rng = generate_rng(seed);
60    let base_tables = create_base_tables(testdata, client).await.unwrap();
61
62    let rows_per_table = 50;
63    let max_rows_inserted = rows_per_table * base_tables.len();
64    let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
65    tracing::info!("Populated base tables");
66
67    let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
68        .await
69        .unwrap();
70
71    // Generate an update for some inserts, on the corresponding table.
72    update_base_tables(client, &mut rng, &base_tables, &inserts).await;
73
74    test_sqlsmith(
75        client,
76        &mut rng,
77        tables.clone(),
78        base_tables.clone(),
79        max_rows_inserted,
80    )
81    .await;
82    tracing::info!("Passed sqlsmith tests");
83
84    tracing::info!("Ran updates");
85
86    let mut generated_queries = 0;
87    for _ in 0..count {
88        test_session_variable(client, &mut rng).await;
89        let sql = sql_gen(&mut rng, tables.clone());
90        tracing::info!("[EXECUTING TEST_BATCH]: {}", sql);
91        let result = run_query(timeout_duration, client, sql.as_str()).await;
92        match result {
93            Err(_e) => {
94                generated_queries += 1;
95                tracing::info!("Generated {} batch queries", generated_queries);
96                tracing::error!("Unrecoverable error encountered.");
97                return;
98            }
99            Ok(0) => {
100                generated_queries += 1;
101            }
102            _ => {}
103        }
104    }
105    tracing::info!("Generated {} batch queries", generated_queries);
106
107    let mut generated_queries = 0;
108    for _ in 0..count {
109        test_session_variable(client, &mut rng).await;
110        let (sql, table) = mview_sql_gen(&mut rng, tables.clone(), "stream_query");
111        tracing::info!("[EXECUTING TEST_STREAM]: {}", sql);
112        let result = run_query(timeout_duration, client, sql.as_str()).await;
113        match result {
114            Err(_e) => {
115                generated_queries += 1;
116                tracing::info!("Generated {} stream queries", generated_queries);
117                tracing::error!("Unrecoverable error encountered.");
118                return;
119            }
120            Ok(0) => {
121                generated_queries += 1;
122            }
123            _ => {}
124        }
125        tracing::info!("[EXECUTING DROP MVIEW]: {}", &format_drop_mview(&table));
126        drop_mview_table(&table, client).await;
127    }
128    tracing::info!("Generated {} stream queries", generated_queries);
129
130    drop_tables(&mviews, testdata, client).await;
131}
132
133/// e2e test runner for sqlsmith
134pub async fn run(client: &Client, testdata: &str, count: usize, seed: Option<u64>) {
135    let mut rng = generate_rng(seed);
136
137    set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
138    set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
139    tracing::info!("Set session variables");
140
141    let base_tables = create_base_tables(testdata, client).await.unwrap();
142
143    let rows_per_table = 50;
144    let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
145    tracing::info!("Populated base tables");
146
147    let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
148        .await
149        .unwrap();
150    tracing::info!("Created tables");
151
152    // Generate an update for some inserts, on the corresponding table.
153    update_base_tables(client, &mut rng, &base_tables, &inserts).await;
154    tracing::info!("Ran updates");
155
156    let max_rows_inserted = rows_per_table * base_tables.len();
157    test_sqlsmith(
158        client,
159        &mut rng,
160        tables.clone(),
161        base_tables.clone(),
162        max_rows_inserted,
163    )
164    .await;
165    tracing::info!("Passed sqlsmith tests");
166
167    test_batch_queries(client, &mut rng, tables.clone(), count)
168        .await
169        .unwrap();
170    tracing::info!("Passed batch queries");
171    test_stream_queries(client, &mut rng, tables.clone(), count)
172        .await
173        .unwrap();
174    tracing::info!("Passed stream queries");
175
176    drop_tables(&mviews, testdata, client).await;
177    tracing::info!("[EXECUTION SUCCESS]");
178}