risingwave_sqlsmith/sql_gen/
time_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rand::Rng;
16use rand::prelude::IndexedRandom;
17use risingwave_common::types::DataType;
18use risingwave_sqlparser::ast::{
19    DataType as AstDataType, FunctionArg, ObjectName, TableAlias, TableFactor,
20};
21
22use crate::sql_gen::utils::{create_args, create_table_alias};
23use crate::sql_gen::{Column, Expr, SqlGenerator, Table};
24
25impl<R: Rng> SqlGenerator<'_, R> {
26    /// Generates time window functions.
27    pub(crate) fn gen_time_window_func(&mut self) -> (TableFactor, Table) {
28        match self.flip_coin() {
29            true => self.gen_hop(),
30            false => self.gen_tumble(),
31        }
32    }
33
34    /// Generates `TUMBLE`.
35    /// TUMBLE(data: TABLE, timecol: COLUMN, size: INTERVAL, offset?: INTERVAL)
36    fn gen_tumble(&mut self) -> (TableFactor, Table) {
37        let tables: Vec<_> = find_tables_with_timestamp_cols(self.tables.clone());
38        let (source_table_name, time_cols, schema) = tables
39            .choose(&mut self.rng)
40            .expect("seeded tables all do not have timestamp");
41        let table_name = self.gen_table_name_with_prefix("tumble");
42        let alias = create_table_alias(&table_name);
43
44        let name = Expr::Identifier(source_table_name.as_str().into());
45        let size = self.gen_size(1);
46        let time_col = time_cols.choose(&mut self.rng).unwrap();
47        let time_col = Expr::Identifier(time_col.name.as_str().into());
48        let args = create_args(vec![name, time_col, size]);
49        let relation = create_tvf("tumble", alias, args, false);
50
51        let table = Table::new(table_name, schema.clone());
52
53        (relation, table)
54    }
55
56    /// Generates `HOP`.
57    /// HOP(data: TABLE, timecol: COLUMN, slide: INTERVAL, size: INTERVAL, offset?: INTERVAL)
58    fn gen_hop(&mut self) -> (TableFactor, Table) {
59        let tables = find_tables_with_timestamp_cols(self.tables.clone());
60        let (source_table_name, time_cols, schema) = tables
61            .choose(&mut self.rng)
62            .expect("seeded tables all do not have timestamp");
63        let table_name = self.gen_table_name_with_prefix("hop");
64        let alias = create_table_alias(&table_name);
65
66        let time_col = time_cols.choose(&mut self.rng).unwrap();
67
68        let name = Expr::Identifier(source_table_name.as_str().into());
69        // We fix slide to "1" here, as slide needs to be divisible by size.
70        let (slide_secs, slide) = self.gen_slide();
71        let size = self.gen_size(slide_secs);
72        let time_col = Expr::Identifier(time_col.name.as_str().into());
73        let args = create_args(vec![name, time_col, slide, size]);
74
75        let relation = create_tvf("hop", alias, args, false);
76
77        let table = Table::new(table_name, schema.clone());
78
79        (relation, table)
80    }
81
82    fn gen_secs(&mut self) -> u64 {
83        self.rng.random_range(1..100)
84    }
85
86    // TODO(kwannoel): Disable for now, otherwise time window may take forever
87    // fn gen_secs(&mut self) -> u64 {
88    //     let minute = 60;
89    //     let hour = 60 * minute;
90    //     let day = 24 * hour;
91    //     let week = 7 * day;
92    //     let rand_secs = self.rng.random_range(1..week);
93    //     let choices = [1, minute, hour, day, week, rand_secs];
94    //     let secs = choices.choose(&mut self.rng).unwrap();
95    //     *secs
96    // }
97
98    fn secs_to_interval_expr(i: u64) -> Expr {
99        Expr::TypedString {
100            data_type: AstDataType::Interval,
101            value: i.to_string(),
102        }
103    }
104
105    fn gen_slide(&mut self) -> (u64, Expr) {
106        let slide_secs = self.gen_secs();
107        let expr = Self::secs_to_interval_expr(slide_secs);
108        (slide_secs, expr)
109    }
110
111    /// Size must be divisible by slide.
112    /// i.e.
113    /// `size_secs` = k * `slide_secs`.
114    /// k cannot be too large, to avoid overflow.
115    fn gen_size(&mut self, slide_secs: u64) -> Expr {
116        let k = self.rng.random_range(1..20);
117        let size_secs = k * slide_secs;
118        Self::secs_to_interval_expr(size_secs)
119    }
120}
121
122/// Create a table view function.
123fn create_tvf(
124    name: &str,
125    alias: TableAlias,
126    args: Vec<FunctionArg>,
127    with_ordinality: bool,
128) -> TableFactor {
129    TableFactor::TableFunction {
130        name: ObjectName(vec![name.into()]),
131        alias: Some(alias),
132        args,
133        with_ordinality,
134    }
135}
136
137fn is_timestamp_col(c: &Column) -> bool {
138    c.data_type == DataType::Timestamp || c.data_type == DataType::Timestamptz
139}
140
141fn find_tables_with_timestamp_cols(tables: Vec<Table>) -> Vec<(String, Vec<Column>, Vec<Column>)> {
142    tables
143        .into_iter()
144        .filter_map(|table| {
145            if !table.is_base_table {
146                return None;
147            }
148            let name = table.name.clone();
149            let columns = table.get_qualified_columns();
150            let mut timestamp_cols = vec![];
151            for col in columns {
152                let col_name = col.name.clone();
153                if col_name.contains("window_start") || col_name.contains("window_end") {
154                    return None;
155                }
156                if is_timestamp_col(&col) {
157                    timestamp_cols.push(col);
158                }
159            }
160            if timestamp_cols.is_empty() {
161                None
162            } else {
163                Some((name, timestamp_cols, table.columns))
164            }
165        })
166        .collect()
167}