risingwave_sqlsmith/test_runners/
diff.rs1use anyhow::bail;
18use itertools::Itertools;
19use rand::Rng;
20use similar::{ChangeTag, TextDiff};
21use tokio_postgres::{Client, SimpleQueryMessage};
22
23use crate::test_runners::utils::{
24 Result, create_base_tables, create_mviews, drop_mview_table, drop_tables, format_drop_mview,
25 generate_rng, populate_tables, run_query, run_query_inner, set_variable, update_base_tables,
26};
27use crate::{Table, differential_sql_gen};
28
29pub async fn run_differential_testing(
31 client: &Client,
32 testdata: &str,
33 count: usize,
34 seed: Option<u64>,
35) -> Result<()> {
36 let mut rng = generate_rng(seed);
37
38 set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
39 set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
40 tracing::info!("Set session variables");
41
42 let base_tables = create_base_tables(testdata, client).await.unwrap();
43
44 let rows_per_table = 50;
45 let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
46 tracing::info!("Populated base tables");
47
48 let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
49 .await
50 .unwrap();
51 tracing::info!("Created tables");
52
53 update_base_tables(client, &mut rng, &base_tables, &inserts).await;
55 tracing::info!("Ran updates");
56
57 for i in 0..count {
58 diff_stream_and_batch(&mut rng, tables.clone(), client, i).await?
59 }
60
61 drop_tables(&mviews, testdata, client).await;
62 tracing::info!("[EXECUTION SUCCESS]");
63 Ok(())
64}
65
66async fn diff_stream_and_batch(
71 rng: &mut impl Rng,
72 mvs_and_base_tables: Vec<Table>,
73 client: &Client,
74 i: usize,
75) -> Result<()> {
76 let mview_name = format!("stream_{}", i);
78 let (batch, stream, table) = differential_sql_gen(rng, mvs_and_base_tables, &mview_name)?;
79 diff_stream_and_batch_with_sqls(client, i, &batch, &stream, &mview_name, &table).await
80}
81
82async fn diff_stream_and_batch_with_sqls(
83 client: &Client,
84 i: usize,
85 batch: &str,
86 stream: &str,
87 mview_name: &str,
88 table: &Table,
89) -> Result<()> {
90 tracing::info!("[RUN CREATE MVIEW id={}]: {}", i, stream);
91 let skip_count = run_query(12, client, stream).await?;
92 if skip_count > 0 {
93 tracing::info!("[RUN DROP MVIEW id={}]: {}", i, &format_drop_mview(table));
94 drop_mview_table(table, client).await;
95 return Ok(());
96 }
97
98 let select = format!("SELECT * FROM {}", &mview_name);
99 tracing::info!("[RUN SELECT * FROM MVIEW id={}]: {}", i, select);
100 let (skip_count, stream_result) = run_query_inner(12, client, &select, true).await?;
101 if skip_count > 0 {
102 bail!("SQL should not fail: {:?}", select)
103 }
104
105 tracing::info!("[RUN - BATCH QUERY id={}]: {}", i, &batch);
106 let (skip_count, batch_result) = run_query_inner(12, client, batch, true).await?;
107 if skip_count > 0 {
108 tracing::info!(
109 "[DIFF - DROP MVIEW id={}]: {}",
110 i,
111 &format_drop_mview(table)
112 );
113 drop_mview_table(table, client).await;
114 return Ok(());
115 }
116 let n_stream_rows = stream_result.len();
117 let n_batch_rows = batch_result.len();
118 let formatted_stream_rows = format_rows(&batch_result);
119 let formatted_batch_rows = format_rows(&stream_result);
120 tracing::debug!(
121 "[COMPARE - STREAM_FORMATTED_ROW id={}]: {formatted_stream_rows}",
122 i,
123 );
124 tracing::debug!(
125 "[COMPARE - BATCH_FORMATTED_ROW id={}]: {formatted_batch_rows}",
126 i,
127 );
128
129 let diff = TextDiff::from_lines(&formatted_batch_rows, &formatted_stream_rows);
130
131 let diff: String = diff
132 .iter_all_changes()
133 .filter_map(|change| match change.tag() {
134 ChangeTag::Delete => Some(format!("-{}", change)),
135 ChangeTag::Insert => Some(format!("+{}", change)),
136 ChangeTag::Equal => None,
137 })
138 .collect();
139
140 if diff.is_empty() {
141 tracing::info!("[RUN DROP MVIEW id={}]: {}", i, format_drop_mview(table));
142 tracing::info!("[PASSED DIFF id={}, rows_compared={n_stream_rows}]", i);
143
144 drop_mview_table(table, client).await;
145 Ok(())
146 } else {
147 bail!(
148 "
149Different results for batch and stream:
150
151BATCH SQL:
152{batch}
153
154STREAM SQL:
155{stream}
156
157SELECT FROM STREAM SQL:
158{select}
159
160BATCH_ROW_LEN:
161{n_batch_rows}
162
163STREAM_ROW_LEN:
164{n_stream_rows}
165
166BATCH_ROWS:
167{formatted_batch_rows}
168
169STREAM_ROWS:
170{formatted_stream_rows}
171
172ROW DIFF (+/-):
173{diff}
174",
175 )
176 }
177}
178
179fn format_rows(rows: &[SimpleQueryMessage]) -> String {
181 rows.iter()
182 .filter_map(|r| match r {
183 SimpleQueryMessage::Row(row) => {
184 let n_cols = row.columns().len();
185 let formatted_row: String = (0..n_cols)
186 .map(|i| {
187 format!(
188 "{:#?}",
189 match row.get(i) {
190 Some(s) => s,
191 _ => "NULL",
192 }
193 )
194 })
195 .join(", ");
196 Some(formatted_row)
197 }
198 SimpleQueryMessage::CommandComplete(_n_rows) => None,
199 _ => unreachable!(),
200 })
201 .sorted()
202 .join("\n")
203}