risingwave_sqlsmith/sql_gen/
time_window.rs1use rand::Rng;
16use rand::prelude::IndexedRandom;
17use risingwave_common::types::DataType;
18use risingwave_sqlparser::ast::{
19    DataType as AstDataType, FunctionArg, ObjectName, TableAlias, TableFactor,
20};
21
22use crate::config::Feature;
23use crate::sql_gen::utils::{create_args, create_table_alias};
24use crate::sql_gen::{Column, Expr, SqlGenerator, Table};
25
26impl<R: Rng> SqlGenerator<'_, R> {
27    pub(crate) fn gen_time_window_func(&mut self) -> (TableFactor, Table) {
29        match self.flip_coin() {
30            true => self.gen_hop(),
31            false => self.gen_tumble(),
32        }
33    }
34
35    fn gen_tumble(&mut self) -> (TableFactor, Table) {
38        let source_tables = if self.should_generate(Feature::Eowc) {
39            self.get_append_only_tables()
40        } else {
41            self.tables.clone()
42        };
43        let tables: Vec<_> = find_tables_with_timestamp_cols(source_tables);
44        let (source_table_name, time_cols, schema) = tables
45            .choose(&mut self.rng)
46            .expect("seeded tables all do not have timestamp");
47        let table_name = self.gen_table_name_with_prefix("tumble");
48        let alias = create_table_alias(&table_name);
49
50        let name = Expr::Identifier(source_table_name.as_str().into());
51        let size = self.gen_size(1);
52        let time_col = time_cols.choose(&mut self.rng).unwrap();
53        let time_col = time_col.name_expr();
54        let args = create_args(vec![name, time_col, size]);
55        let relation = create_tvf("tumble", alias, args, false);
56
57        let table = Table::new(table_name, schema.clone());
58
59        (relation, table)
60    }
61
62    fn gen_hop(&mut self) -> (TableFactor, Table) {
65        let source_tables = if self.should_generate(Feature::Eowc) {
66            self.get_append_only_tables()
67        } else {
68            self.tables.clone()
69        };
70        let tables: Vec<_> = find_tables_with_timestamp_cols(source_tables);
71        let (source_table_name, time_cols, schema) = tables
72            .choose(&mut self.rng)
73            .expect("seeded tables all do not have timestamp");
74        let table_name = self.gen_table_name_with_prefix("hop");
75        let alias = create_table_alias(&table_name);
76
77        let time_col = time_cols.choose(&mut self.rng).unwrap();
78
79        let name = Expr::Identifier(source_table_name.as_str().into());
80        let (slide_secs, slide) = self.gen_slide();
82        let size = self.gen_size(slide_secs);
83        let time_col = time_col.name_expr();
84        let args = create_args(vec![name, time_col, slide, size]);
85
86        let relation = create_tvf("hop", alias, args, false);
87
88        let table = Table::new(table_name, schema.clone());
89
90        (relation, table)
91    }
92
93    fn gen_secs(&mut self) -> u64 {
94        self.rng.random_range(1..100)
95    }
96
97    fn secs_to_interval_expr(i: u64) -> Expr {
110        Expr::TypedString {
111            data_type: AstDataType::Interval,
112            value: i.to_string(),
113        }
114    }
115
116    fn gen_slide(&mut self) -> (u64, Expr) {
117        let slide_secs = self.gen_secs();
118        let expr = Self::secs_to_interval_expr(slide_secs);
119        (slide_secs, expr)
120    }
121
122    fn gen_size(&mut self, slide_secs: u64) -> Expr {
127        let k = self.rng.random_range(1..20);
128        let size_secs = k * slide_secs;
129        Self::secs_to_interval_expr(size_secs)
130    }
131}
132
133fn create_tvf(
135    name: &str,
136    alias: TableAlias,
137    args: Vec<FunctionArg>,
138    with_ordinality: bool,
139) -> TableFactor {
140    TableFactor::TableFunction {
141        name: ObjectName(vec![name.into()]),
142        alias: Some(alias),
143        args,
144        with_ordinality,
145    }
146}
147
148fn is_timestamp_col(c: &Column) -> bool {
149    c.data_type == DataType::Timestamp || c.data_type == DataType::Timestamptz
150}
151
152fn find_tables_with_timestamp_cols(tables: Vec<Table>) -> Vec<(String, Vec<Column>, Vec<Column>)> {
153    tables
154        .into_iter()
155        .filter_map(|table| {
156            if !table.is_base_table {
157                return None;
158            }
159            let name = table.name.clone();
160            let columns = table.get_qualified_columns();
161            let mut timestamp_cols = vec![];
162            for col in columns {
163                let col_name = col.name.base_name();
164                if col_name.contains("window_start") || col_name.contains("window_end") {
165                    return None;
166                }
167                if is_timestamp_col(&col) {
168                    timestamp_cols.push(col);
169                }
170            }
171            if timestamp_cols.is_empty() {
172                None
173            } else {
174                Some((name, timestamp_cols, table.columns))
175            }
176        })
177        .collect()
178}