risingwave_sqlsmith/sql_gen/
time_window.rs1use rand::Rng;
16use rand::prelude::IndexedRandom;
17use risingwave_common::types::DataType;
18use risingwave_sqlparser::ast::{
19 DataType as AstDataType, FunctionArg, ObjectName, TableAlias, TableFactor,
20};
21
22use crate::config::Feature;
23use crate::sql_gen::utils::{create_args, create_table_alias};
24use crate::sql_gen::{Column, Expr, SqlGenerator, Table};
25
26impl<R: Rng> SqlGenerator<'_, R> {
27 pub(crate) fn gen_time_window_func(&mut self) -> (TableFactor, Table) {
29 match self.flip_coin() {
30 true => self.gen_hop(),
31 false => self.gen_tumble(),
32 }
33 }
34
35 fn gen_tumble(&mut self) -> (TableFactor, Table) {
38 let source_tables = if self.should_generate(Feature::Eowc) {
39 self.get_append_only_tables()
40 } else {
41 self.tables.clone()
42 };
43 let tables: Vec<_> = find_tables_with_timestamp_cols(source_tables);
44 let (source_table_name, time_cols, schema) = tables
45 .choose(&mut self.rng)
46 .expect("seeded tables all do not have timestamp");
47 let table_name = self.gen_table_name_with_prefix("tumble");
48 let alias = create_table_alias(&table_name);
49
50 let name = Expr::Identifier(source_table_name.as_str().into());
51 let size = self.gen_size(1);
52 let time_col = time_cols.choose(&mut self.rng).unwrap();
53 let time_col = time_col.name_expr();
54 let args = create_args(vec![name, time_col, size]);
55 let relation = create_tvf("tumble", alias, args, false);
56
57 let table = Table::new(table_name, schema.clone());
58
59 (relation, table)
60 }
61
62 fn gen_hop(&mut self) -> (TableFactor, Table) {
65 let source_tables = if self.should_generate(Feature::Eowc) {
66 self.get_append_only_tables()
67 } else {
68 self.tables.clone()
69 };
70 let tables: Vec<_> = find_tables_with_timestamp_cols(source_tables);
71 let (source_table_name, time_cols, schema) = tables
72 .choose(&mut self.rng)
73 .expect("seeded tables all do not have timestamp");
74 let table_name = self.gen_table_name_with_prefix("hop");
75 let alias = create_table_alias(&table_name);
76
77 let time_col = time_cols.choose(&mut self.rng).unwrap();
78
79 let name = Expr::Identifier(source_table_name.as_str().into());
80 let (slide_secs, slide) = self.gen_slide();
82 let size = self.gen_size(slide_secs);
83 let time_col = time_col.name_expr();
84 let args = create_args(vec![name, time_col, slide, size]);
85
86 let relation = create_tvf("hop", alias, args, false);
87
88 let table = Table::new(table_name, schema.clone());
89
90 (relation, table)
91 }
92
93 fn gen_secs(&mut self) -> u64 {
94 self.rng.random_range(1..100)
95 }
96
97 fn secs_to_interval_expr(i: u64) -> Expr {
110 Expr::TypedString {
111 data_type: AstDataType::Interval,
112 value: i.to_string(),
113 }
114 }
115
116 fn gen_slide(&mut self) -> (u64, Expr) {
117 let slide_secs = self.gen_secs();
118 let expr = Self::secs_to_interval_expr(slide_secs);
119 (slide_secs, expr)
120 }
121
122 fn gen_size(&mut self, slide_secs: u64) -> Expr {
127 let k = self.rng.random_range(1..20);
128 let size_secs = k * slide_secs;
129 Self::secs_to_interval_expr(size_secs)
130 }
131}
132
133fn create_tvf(
135 name: &str,
136 alias: TableAlias,
137 args: Vec<FunctionArg>,
138 with_ordinality: bool,
139) -> TableFactor {
140 TableFactor::TableFunction {
141 name: ObjectName(vec![name.into()]),
142 alias: Some(alias),
143 args,
144 with_ordinality,
145 }
146}
147
148fn is_timestamp_col(c: &Column) -> bool {
149 c.data_type == DataType::Timestamp || c.data_type == DataType::Timestamptz
150}
151
152fn find_tables_with_timestamp_cols(tables: Vec<Table>) -> Vec<(String, Vec<Column>, Vec<Column>)> {
153 tables
154 .into_iter()
155 .filter_map(|table| {
156 if !table.is_base_table {
157 return None;
158 }
159 let name = table.name.clone();
160 let columns = table.get_qualified_columns();
161 let mut timestamp_cols = vec![];
162 for col in columns {
163 let col_name = col.name.base_name();
164 if col_name.contains("window_start") || col_name.contains("window_end") {
165 return None;
166 }
167 if is_timestamp_col(&col) {
168 timestamp_cols.push(col);
169 }
170 }
171 if timestamp_cols.is_empty() {
172 None
173 } else {
174 Some((name, timestamp_cols, table.columns))
175 }
176 })
177 .collect()
178}