risingwave_sqlsmith/test_runners/
diff.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Provides E2E Test runner functionality.
16
17use anyhow::bail;
18use itertools::Itertools;
19use rand::Rng;
20use similar::{ChangeTag, TextDiff};
21use tokio_postgres::{Client, SimpleQueryMessage};
22
23use crate::test_runners::utils::{
24    Result, create_base_tables, create_mviews, drop_mview_table, drop_tables, format_drop_mview,
25    generate_rng, populate_tables, run_query, run_query_inner, set_variable, update_base_tables,
26};
27use crate::{Table, differential_sql_gen};
28
29/// Differential testing for batch and stream
30pub async fn run_differential_testing(
31    client: &Client,
32    testdata: &str,
33    count: usize,
34    seed: Option<u64>,
35) -> Result<()> {
36    let mut rng = generate_rng(seed);
37
38    set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
39    set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
40    tracing::info!("Set session variables");
41
42    let base_tables = create_base_tables(testdata, client).await.unwrap();
43
44    let rows_per_table = 50;
45    let inserts = populate_tables(client, &mut rng, base_tables.clone(), rows_per_table).await;
46    tracing::info!("Populated base tables");
47
48    let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client)
49        .await
50        .unwrap();
51    tracing::info!("Created tables");
52
53    // Generate an update for some inserts, on the corresponding table.
54    update_base_tables(client, &mut rng, &base_tables, &inserts).await;
55    tracing::info!("Ran updates");
56
57    for i in 0..count {
58        diff_stream_and_batch(&mut rng, tables.clone(), client, i).await?
59    }
60
61    drop_tables(&mviews, testdata, client).await;
62    tracing::info!("[EXECUTION SUCCESS]");
63    Ok(())
64}
65
66/// Create the tables defined in testdata, along with some mviews.
67/// Just test number of rows for now.
68/// TODO(kwannoel): Test row contents as well. That requires us to run a batch query
69/// with `select * ORDER BY <all columns>`.
70async fn diff_stream_and_batch(
71    rng: &mut impl Rng,
72    mvs_and_base_tables: Vec<Table>,
73    client: &Client,
74    i: usize,
75) -> Result<()> {
76    // Generate some mviews
77    let mview_name = format!("stream_{}", i);
78    let (batch, stream, table) = differential_sql_gen(rng, mvs_and_base_tables, &mview_name)?;
79    diff_stream_and_batch_with_sqls(client, i, &batch, &stream, &mview_name, &table).await
80}
81
82async fn diff_stream_and_batch_with_sqls(
83    client: &Client,
84    i: usize,
85    batch: &str,
86    stream: &str,
87    mview_name: &str,
88    table: &Table,
89) -> Result<()> {
90    tracing::info!("[RUN CREATE MVIEW id={}]: {}", i, stream);
91    let skip_count = run_query(12, client, stream).await?;
92    if skip_count > 0 {
93        tracing::info!("[RUN DROP MVIEW id={}]: {}", i, &format_drop_mview(table));
94        drop_mview_table(table, client).await;
95        return Ok(());
96    }
97
98    let select = format!("SELECT * FROM {}", &mview_name);
99    tracing::info!("[RUN SELECT * FROM MVIEW id={}]: {}", i, select);
100    let (skip_count, stream_result) = run_query_inner(12, client, &select, true).await?;
101    if skip_count > 0 {
102        bail!("SQL should not fail: {:?}", select)
103    }
104
105    tracing::info!("[RUN - BATCH QUERY id={}]: {}", i, &batch);
106    let (skip_count, batch_result) = run_query_inner(12, client, batch, true).await?;
107    if skip_count > 0 {
108        tracing::info!(
109            "[DIFF - DROP MVIEW id={}]: {}",
110            i,
111            &format_drop_mview(table)
112        );
113        drop_mview_table(table, client).await;
114        return Ok(());
115    }
116    let n_stream_rows = stream_result.len();
117    let n_batch_rows = batch_result.len();
118    let formatted_stream_rows = format_rows(&batch_result);
119    let formatted_batch_rows = format_rows(&stream_result);
120    tracing::debug!(
121        "[COMPARE - STREAM_FORMATTED_ROW id={}]: {formatted_stream_rows}",
122        i,
123    );
124    tracing::debug!(
125        "[COMPARE - BATCH_FORMATTED_ROW id={}]: {formatted_batch_rows}",
126        i,
127    );
128
129    let diff = TextDiff::from_lines(&formatted_batch_rows, &formatted_stream_rows);
130
131    let diff: String = diff
132        .iter_all_changes()
133        .filter_map(|change| match change.tag() {
134            ChangeTag::Delete => Some(format!("-{}", change)),
135            ChangeTag::Insert => Some(format!("+{}", change)),
136            ChangeTag::Equal => None,
137        })
138        .collect();
139
140    if diff.is_empty() {
141        tracing::info!("[RUN DROP MVIEW id={}]: {}", i, format_drop_mview(table));
142        tracing::info!("[PASSED DIFF id={}, rows_compared={n_stream_rows}]", i);
143
144        drop_mview_table(table, client).await;
145        Ok(())
146    } else {
147        bail!(
148            "
149Different results for batch and stream:
150
151BATCH SQL:
152{batch}
153
154STREAM SQL:
155{stream}
156
157SELECT FROM STREAM SQL:
158{select}
159
160BATCH_ROW_LEN:
161{n_batch_rows}
162
163STREAM_ROW_LEN:
164{n_stream_rows}
165
166BATCH_ROWS:
167{formatted_batch_rows}
168
169STREAM_ROWS:
170{formatted_stream_rows}
171
172ROW DIFF (+/-):
173{diff}
174",
175        )
176    }
177}
178
179/// Format + sort rows so they can be diffed.
180fn format_rows(rows: &[SimpleQueryMessage]) -> String {
181    rows.iter()
182        .filter_map(|r| match r {
183            SimpleQueryMessage::Row(row) => {
184                let n_cols = row.columns().len();
185                let formatted_row: String = (0..n_cols)
186                    .map(|i| {
187                        format!(
188                            "{:#?}",
189                            match row.get(i) {
190                                Some(s) => s,
191                                _ => "NULL",
192                            }
193                        )
194                    })
195                    .join(", ");
196                Some(formatted_row)
197            }
198            SimpleQueryMessage::CommandComplete(_n_rows) => None,
199            _ => unreachable!(),
200        })
201        .sorted()
202        .join("\n")
203}