risingwave_sqlsmith/test_runners/
diff.rs1use anyhow::bail;
18use itertools::Itertools;
19use rand::Rng;
20use similar::{ChangeTag, TextDiff};
21use tokio_postgres::{Client, SimpleQueryMessage};
22
23use crate::config::Configuration;
24use crate::test_runners::utils::{
25 Result, create_base_tables, create_mviews, drop_mview_table, drop_tables, format_drop_mview,
26 generate_rng, populate_tables, run_query, run_query_inner, set_variable, update_base_tables,
27};
28use crate::{Table, differential_sql_gen};
29
30pub async fn run_differential_testing(
32 client: &Client,
33 testdata: &str,
34 count: usize,
35 config: &Configuration,
36 seed: Option<u64>,
37) -> Result<()> {
38 let mut rng = generate_rng(seed);
39
40 set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
41 set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
42 tracing::info!("Set session variables");
43
44 let base_tables = create_base_tables(testdata, client).await.unwrap();
45
46 let rows_per_table = 50;
47 let inserts = populate_tables(
48 client,
49 &mut rng,
50 base_tables.clone(),
51 rows_per_table,
52 config,
53 )
54 .await;
55 tracing::info!("Populated base tables");
56
57 let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client, config)
58 .await
59 .unwrap();
60 tracing::info!("Created tables");
61
62 update_base_tables(client, &mut rng, &base_tables, &inserts, config).await;
64 tracing::info!("Ran updates");
65
66 for i in 0..count {
67 diff_stream_and_batch(&mut rng, tables.clone(), client, i, config).await?
68 }
69
70 drop_tables(&mviews, testdata, client).await;
71 tracing::info!("[EXECUTION SUCCESS]");
72 Ok(())
73}
74
75async fn diff_stream_and_batch(
80 rng: &mut impl Rng,
81 mvs_and_base_tables: Vec<Table>,
82 client: &Client,
83 i: usize,
84 config: &Configuration,
85) -> Result<()> {
86 let mview_name = format!("stream_{}", i);
88 let (batch, stream, table) =
89 differential_sql_gen(rng, mvs_and_base_tables, &mview_name, config)?;
90 diff_stream_and_batch_with_sqls(client, i, &batch, &stream, &mview_name, &table).await
91}
92
93async fn diff_stream_and_batch_with_sqls(
94 client: &Client,
95 i: usize,
96 batch: &str,
97 stream: &str,
98 mview_name: &str,
99 table: &Table,
100) -> Result<()> {
101 tracing::info!("[RUN CREATE MVIEW id={}]: {}", i, stream);
102 let skip_count = run_query(12, client, stream).await?;
103 if skip_count > 0 {
104 tracing::info!("[RUN DROP MVIEW id={}]: {}", i, &format_drop_mview(table));
105 drop_mview_table(table, client).await;
106 return Ok(());
107 }
108
109 let select = format!("SELECT * FROM {}", &mview_name);
110 tracing::info!("[RUN SELECT * FROM MVIEW id={}]: {}", i, select);
111 let (skip_count, stream_result) = run_query_inner(12, client, &select, true).await?;
112 if skip_count > 0 {
113 bail!("SQL should not fail: {:?}", select)
114 }
115
116 tracing::info!("[RUN - BATCH QUERY id={}]: {}", i, &batch);
117 let (skip_count, batch_result) = run_query_inner(12, client, batch, true).await?;
118 if skip_count > 0 {
119 tracing::info!(
120 "[DIFF - DROP MVIEW id={}]: {}",
121 i,
122 &format_drop_mview(table)
123 );
124 drop_mview_table(table, client).await;
125 return Ok(());
126 }
127 let n_stream_rows = stream_result.len();
128 let n_batch_rows = batch_result.len();
129 let formatted_stream_rows = format_rows(&batch_result);
130 let formatted_batch_rows = format_rows(&stream_result);
131 tracing::debug!(
132 "[COMPARE - STREAM_FORMATTED_ROW id={}]: {formatted_stream_rows}",
133 i,
134 );
135 tracing::debug!(
136 "[COMPARE - BATCH_FORMATTED_ROW id={}]: {formatted_batch_rows}",
137 i,
138 );
139
140 let diff = TextDiff::from_lines(&formatted_batch_rows, &formatted_stream_rows);
141
142 let diff: String = diff
143 .iter_all_changes()
144 .filter_map(|change| match change.tag() {
145 ChangeTag::Delete => Some(format!("-{}", change)),
146 ChangeTag::Insert => Some(format!("+{}", change)),
147 ChangeTag::Equal => None,
148 })
149 .collect();
150
151 if diff.is_empty() {
152 tracing::info!("[RUN DROP MVIEW id={}]: {}", i, format_drop_mview(table));
153 tracing::info!("[PASSED DIFF id={}, rows_compared={n_stream_rows}]", i);
154
155 drop_mview_table(table, client).await;
156 Ok(())
157 } else {
158 bail!(
159 "
160Different results for batch and stream:
161
162BATCH SQL:
163{batch}
164
165STREAM SQL:
166{stream}
167
168SELECT FROM STREAM SQL:
169{select}
170
171BATCH_ROW_LEN:
172{n_batch_rows}
173
174STREAM_ROW_LEN:
175{n_stream_rows}
176
177BATCH_ROWS:
178{formatted_batch_rows}
179
180STREAM_ROWS:
181{formatted_stream_rows}
182
183ROW DIFF (+/-):
184{diff}
185",
186 )
187 }
188}
189
190fn format_rows(rows: &[SimpleQueryMessage]) -> String {
192 rows.iter()
193 .filter_map(|r| match r {
194 SimpleQueryMessage::Row(row) => {
195 let n_cols = row.columns().len();
196 let formatted_row: String = (0..n_cols)
197 .map(|i| {
198 format!(
199 "{:#?}",
200 match row.get(i) {
201 Some(s) => s,
202 _ => "NULL",
203 }
204 )
205 })
206 .join(", ");
207 Some(formatted_row)
208 }
209 SimpleQueryMessage::CommandComplete(_n_rows) => None,
210 _ => unreachable!(),
211 })
212 .sorted()
213 .join("\n")
214}