risingwave_sqlsmith/sql_gen/
time_window.rs1use rand::Rng;
16use rand::prelude::IndexedRandom;
17use risingwave_common::types::DataType;
18use risingwave_sqlparser::ast::{
19 DataType as AstDataType, FunctionArg, ObjectName, TableAlias, TableFactor,
20};
21
22use crate::sql_gen::utils::{create_args, create_table_alias};
23use crate::sql_gen::{Column, Expr, SqlGenerator, Table};
24
25impl<R: Rng> SqlGenerator<'_, R> {
26 pub(crate) fn gen_time_window_func(&mut self) -> (TableFactor, Table) {
28 match self.flip_coin() {
29 true => self.gen_hop(),
30 false => self.gen_tumble(),
31 }
32 }
33
34 fn gen_tumble(&mut self) -> (TableFactor, Table) {
37 let tables: Vec<_> = find_tables_with_timestamp_cols(self.tables.clone());
38 let (source_table_name, time_cols, schema) = tables
39 .choose(&mut self.rng)
40 .expect("seeded tables all do not have timestamp");
41 let table_name = self.gen_table_name_with_prefix("tumble");
42 let alias = create_table_alias(&table_name);
43
44 let name = Expr::Identifier(source_table_name.as_str().into());
45 let size = self.gen_size(1);
46 let time_col = time_cols.choose(&mut self.rng).unwrap();
47 let time_col = Expr::Identifier(time_col.name.as_str().into());
48 let args = create_args(vec![name, time_col, size]);
49 let relation = create_tvf("tumble", alias, args, false);
50
51 let table = Table::new(table_name, schema.clone());
52
53 (relation, table)
54 }
55
56 fn gen_hop(&mut self) -> (TableFactor, Table) {
59 let tables = find_tables_with_timestamp_cols(self.tables.clone());
60 let (source_table_name, time_cols, schema) = tables
61 .choose(&mut self.rng)
62 .expect("seeded tables all do not have timestamp");
63 let table_name = self.gen_table_name_with_prefix("hop");
64 let alias = create_table_alias(&table_name);
65
66 let time_col = time_cols.choose(&mut self.rng).unwrap();
67
68 let name = Expr::Identifier(source_table_name.as_str().into());
69 let (slide_secs, slide) = self.gen_slide();
71 let size = self.gen_size(slide_secs);
72 let time_col = Expr::Identifier(time_col.name.as_str().into());
73 let args = create_args(vec![name, time_col, slide, size]);
74
75 let relation = create_tvf("hop", alias, args, false);
76
77 let table = Table::new(table_name, schema.clone());
78
79 (relation, table)
80 }
81
82 fn gen_secs(&mut self) -> u64 {
83 self.rng.random_range(1..100)
84 }
85
86 fn secs_to_interval_expr(i: u64) -> Expr {
99 Expr::TypedString {
100 data_type: AstDataType::Interval,
101 value: i.to_string(),
102 }
103 }
104
105 fn gen_slide(&mut self) -> (u64, Expr) {
106 let slide_secs = self.gen_secs();
107 let expr = Self::secs_to_interval_expr(slide_secs);
108 (slide_secs, expr)
109 }
110
111 fn gen_size(&mut self, slide_secs: u64) -> Expr {
116 let k = self.rng.random_range(1..20);
117 let size_secs = k * slide_secs;
118 Self::secs_to_interval_expr(size_secs)
119 }
120}
121
122fn create_tvf(
124 name: &str,
125 alias: TableAlias,
126 args: Vec<FunctionArg>,
127 with_ordinality: bool,
128) -> TableFactor {
129 TableFactor::TableFunction {
130 name: ObjectName(vec![name.into()]),
131 alias: Some(alias),
132 args,
133 with_ordinality,
134 }
135}
136
137fn is_timestamp_col(c: &Column) -> bool {
138 c.data_type == DataType::Timestamp || c.data_type == DataType::Timestamptz
139}
140
141fn find_tables_with_timestamp_cols(tables: Vec<Table>) -> Vec<(String, Vec<Column>, Vec<Column>)> {
142 tables
143 .into_iter()
144 .filter_map(|table| {
145 if !table.is_base_table {
146 return None;
147 }
148 let name = table.name.clone();
149 let columns = table.get_qualified_columns();
150 let mut timestamp_cols = vec![];
151 for col in columns {
152 let col_name = col.name.clone();
153 if col_name.contains("window_start") || col_name.contains("window_end") {
154 return None;
155 }
156 if is_timestamp_col(&col) {
157 timestamp_cols.push(col);
158 }
159 }
160 if timestamp_cols.is_empty() {
161 None
162 } else {
163 Some((name, timestamp_cols, table.columns))
164 }
165 })
166 .collect()
167}