risingwave_sqlsmith/sql_gen/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Provides Data structures for query generation,
16//! and the interface for generating
17//! stream (MATERIALIZED VIEW) and batch query statements.
18
19use std::collections::HashSet;
20use std::vec;
21
22use rand::Rng;
23use risingwave_common::types::DataType;
24use risingwave_frontend::bind_data_type;
25use risingwave_sqlparser::ast::{
26    ColumnDef, EmitMode, Expr, Ident, ObjectName, SourceWatermark, Statement,
27};
28
29mod agg;
30mod cast;
31mod expr;
32pub use expr::print_function_table;
33
34use crate::config::{Configuration, Feature, GenerateItem};
35
36mod dml;
37mod functions;
38mod query;
39mod relation;
40mod scalar;
41mod table_functions;
42mod time_window;
43mod types;
44mod utils;
45
46#[derive(Clone, Debug)]
47pub struct Table {
48    pub name: String,
49    pub columns: Vec<Column>,
50    pub pk_indices: Vec<usize>,
51    pub is_base_table: bool,
52    pub is_append_only: bool,
53    pub source_watermarks: Vec<SourceWatermark>,
54}
55
56impl Table {
57    pub fn new(name: String, columns: Vec<Column>) -> Self {
58        Self {
59            name,
60            columns,
61            pk_indices: vec![],
62            is_base_table: false,
63            is_append_only: false,
64            source_watermarks: vec![],
65        }
66    }
67
68    pub fn new_for_base_table(
69        name: String,
70        columns: Vec<Column>,
71        pk_indices: Vec<usize>,
72        is_append_only: bool,
73        source_watermarks: Vec<SourceWatermark>,
74    ) -> Self {
75        Self {
76            name,
77            columns,
78            pk_indices,
79            is_base_table: true,
80            is_append_only,
81            source_watermarks,
82        }
83    }
84
85    pub fn get_qualified_columns(&self) -> Vec<Column> {
86        self.columns
87            .iter()
88            .map(|c| {
89                let mut name = c.name.clone();
90                name.0.insert(0, Ident::new_unchecked(&self.name));
91                Column {
92                    name,
93                    data_type: c.data_type.clone(),
94                }
95            })
96            .collect()
97    }
98}
99
100/// Sqlsmith Column definition
101#[derive(Clone, Debug)]
102pub struct Column {
103    pub(crate) name: ObjectName,
104    pub(crate) data_type: DataType,
105}
106
107impl From<ColumnDef> for Column {
108    fn from(c: ColumnDef) -> Self {
109        Self {
110            name: ObjectName(vec![c.name]),
111            data_type: bind_data_type(&c.data_type.expect("data type should not be none")).unwrap(),
112        }
113    }
114}
115
116impl Column {
117    pub fn name_expr(&self) -> Expr {
118        if self.name.0.len() == 1 {
119            Expr::Identifier(self.name.0[0].clone())
120        } else {
121            Expr::CompoundIdentifier(self.name.0.clone())
122        }
123    }
124
125    pub fn base_name(&self) -> Ident {
126        self.name.0.last().unwrap().clone()
127    }
128}
129
130#[derive(Copy, Clone)]
131pub(crate) struct SqlGeneratorContext {
132    can_agg: bool, // This is used to disable agg expr totally,
133    // Used in top level, where we want to test queries
134    // without aggregates.
135    inside_agg: bool,
136}
137
138impl SqlGeneratorContext {
139    pub fn new(can_agg: bool, inside_agg: bool) -> Self {
140        SqlGeneratorContext {
141            can_agg,
142            inside_agg,
143        }
144    }
145
146    pub fn is_inside_agg(self) -> bool {
147        self.inside_agg
148    }
149
150    pub fn can_gen_agg(self) -> bool {
151        self.can_agg && !self.inside_agg
152    }
153}
154
155pub(crate) struct SqlGenerator<'a, R: Rng> {
156    tables: Vec<Table>,
157    rng: &'a mut R,
158
159    /// Relation ID used to generate table names and aliases
160    relation_id: u32,
161
162    /// Relations bound in generated query.
163    /// We might not read from all tables.
164    bound_relations: Vec<Table>,
165
166    /// Columns bound in generated query.
167    /// May not contain all columns from `Self::bound_relations`.
168    /// e.g. GROUP BY clause will constrain `bound_columns`.
169    bound_columns: Vec<Column>,
170
171    /// `SqlGenerator` can be used in two execution modes:
172    /// 1. Generating Query Statements.
173    /// 2. Generating queries for CREATE MATERIALIZED VIEW.
174    ///    Under this mode certain restrictions and workarounds are applied
175    ///    for unsupported stream executors.
176    is_mview: bool,
177
178    recursion_weight: f64,
179
180    /// Configuration to control weight.
181    config: Configuration,
182    // /// Count number of subquery.
183    // /// We don't want too many per query otherwise it is hard to debug.
184    // with_statements: u64,
185}
186
187/// Generators
188impl<'a, R: Rng> SqlGenerator<'a, R> {
189    pub(crate) fn new(rng: &'a mut R, tables: Vec<Table>, config: Configuration) -> Self {
190        SqlGenerator {
191            tables,
192            rng,
193            relation_id: 0,
194            bound_relations: vec![],
195            bound_columns: vec![],
196            is_mview: false,
197            recursion_weight: 0.3,
198            config,
199        }
200    }
201
202    pub(crate) fn new_for_mview(rng: &'a mut R, tables: Vec<Table>, config: Configuration) -> Self {
203        // distinct aggregate is not allowed for MV
204        SqlGenerator {
205            tables,
206            rng,
207            relation_id: 0,
208            bound_relations: vec![],
209            bound_columns: vec![],
210            is_mview: true,
211            recursion_weight: 0.3,
212            config,
213        }
214    }
215
216    pub(crate) fn gen_batch_query_stmt(&mut self) -> Statement {
217        let (query, _) = self.gen_query();
218        Statement::Query(Box::new(query))
219    }
220
221    pub(crate) fn gen_mview_stmt(&mut self, name: &str) -> (Statement, Table) {
222        let (query, schema) = self.gen_query();
223        let query = Box::new(query);
224        let table = Table::new(name.to_owned(), schema);
225        let name = ObjectName(vec![Ident::new_unchecked(name)]);
226
227        // Randomly choose emit mode if allowed
228        let emit_mode = if self.should_generate(Feature::Eowc) {
229            Some(EmitMode::OnWindowClose)
230        } else {
231            None
232        };
233
234        let mview = Statement::CreateView {
235            or_replace: false,
236            materialized: true,
237            if_not_exists: false,
238            name,
239            columns: vec![],
240            query,
241            with_options: vec![],
242            emit_mode,
243        };
244        (mview, table)
245    }
246
247    /// 50/50 chance to be true/false.
248    fn flip_coin(&mut self) -> bool {
249        self.rng.random_bool(0.5)
250    }
251
252    /// Provide recursion bounds.
253    pub(crate) fn can_recurse(&mut self) -> bool {
254        if self.recursion_weight <= 0.0 {
255            return false;
256        }
257        let can_recurse = self.rng.random_bool(self.recursion_weight);
258        if can_recurse {
259            self.recursion_weight *= 0.9;
260            if self.recursion_weight < 0.05 {
261                self.recursion_weight = 0.0;
262            }
263        }
264        can_recurse
265    }
266
267    pub(crate) fn get_columns_with_watermark(&mut self, columns: &[Column]) -> Vec<Column> {
268        let watermark_names: HashSet<_> = self
269            .get_append_only_tables()
270            .iter()
271            .flat_map(|t| t.source_watermarks.iter().map(|wm| wm.column.real_value()))
272            .collect();
273
274        columns
275            .iter()
276            .filter(|c| watermark_names.contains(&c.name.base_name()))
277            .cloned()
278            .collect()
279    }
280
281    pub(crate) fn get_append_only_tables(&mut self) -> Vec<Table> {
282        self.tables
283            .iter()
284            .filter(|t| t.is_append_only)
285            .cloned()
286            .collect()
287    }
288
289    /// Decide whether to generate on config.
290    pub(crate) fn should_generate<T: Into<GenerateItem>>(&mut self, item: T) -> bool {
291        self.config.should_generate(item, self.rng)
292    }
293}