risingwave_sqlsmith/test_runners/
diff.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Provides E2E Test runner functionality.
16
17use anyhow::bail;
18use itertools::Itertools;
19use rand::Rng;
20use similar::{ChangeTag, TextDiff};
21use tokio_postgres::{Client, SimpleQueryMessage};
22
23use crate::config::Configuration;
24use crate::test_runners::utils::{
25    Result, create_base_tables, create_mviews, drop_mview_table, drop_tables, format_drop_mview,
26    generate_rng, populate_tables, run_query, run_query_inner, set_variable, update_base_tables,
27};
28use crate::{Table, differential_sql_gen};
29
30/// Differential testing for batch and stream
31pub async fn run_differential_testing(
32    client: &Client,
33    testdata: &str,
34    count: usize,
35    config: &Configuration,
36    seed: Option<u64>,
37) -> Result<()> {
38    let mut rng = generate_rng(seed);
39
40    set_variable(client, "RW_IMPLICIT_FLUSH", "TRUE").await;
41    set_variable(client, "QUERY_MODE", "DISTRIBUTED").await;
42    tracing::info!("Set session variables");
43
44    let base_tables = create_base_tables(testdata, client).await.unwrap();
45
46    let rows_per_table = 50;
47    let inserts = populate_tables(
48        client,
49        &mut rng,
50        base_tables.clone(),
51        rows_per_table,
52        config,
53    )
54    .await;
55    tracing::info!("Populated base tables");
56
57    let (tables, mviews) = create_mviews(&mut rng, base_tables.clone(), client, config)
58        .await
59        .unwrap();
60    tracing::info!("Created tables");
61
62    // Generate an update for some inserts, on the corresponding table.
63    update_base_tables(client, &mut rng, &base_tables, &inserts, config).await;
64    tracing::info!("Ran updates");
65
66    for i in 0..count {
67        diff_stream_and_batch(&mut rng, tables.clone(), client, i, config).await?
68    }
69
70    drop_tables(&mviews, testdata, client).await;
71    tracing::info!("[EXECUTION SUCCESS]");
72    Ok(())
73}
74
75/// Create the tables defined in testdata, along with some mviews.
76/// Just test number of rows for now.
77/// TODO(kwannoel): Test row contents as well. That requires us to run a batch query
78/// with `select * ORDER BY <all columns>`.
79async fn diff_stream_and_batch(
80    rng: &mut impl Rng,
81    mvs_and_base_tables: Vec<Table>,
82    client: &Client,
83    i: usize,
84    config: &Configuration,
85) -> Result<()> {
86    // Generate some mviews
87    let mview_name = format!("stream_{}", i);
88    let (batch, stream, table) =
89        differential_sql_gen(rng, mvs_and_base_tables, &mview_name, config)?;
90    diff_stream_and_batch_with_sqls(client, i, &batch, &stream, &mview_name, &table).await
91}
92
93async fn diff_stream_and_batch_with_sqls(
94    client: &Client,
95    i: usize,
96    batch: &str,
97    stream: &str,
98    mview_name: &str,
99    table: &Table,
100) -> Result<()> {
101    tracing::info!("[RUN CREATE MVIEW id={}]: {}", i, stream);
102    let skip_count = run_query(12, client, stream).await?;
103    if skip_count > 0 {
104        tracing::info!("[RUN DROP MVIEW id={}]: {}", i, &format_drop_mview(table));
105        drop_mview_table(table, client).await;
106        return Ok(());
107    }
108
109    let select = format!("SELECT * FROM {}", &mview_name);
110    tracing::info!("[RUN SELECT * FROM MVIEW id={}]: {}", i, select);
111    let (skip_count, stream_result) = run_query_inner(12, client, &select, true).await?;
112    if skip_count > 0 {
113        bail!("SQL should not fail: {:?}", select)
114    }
115
116    tracing::info!("[RUN - BATCH QUERY id={}]: {}", i, &batch);
117    let (skip_count, batch_result) = run_query_inner(12, client, batch, true).await?;
118    if skip_count > 0 {
119        tracing::info!(
120            "[DIFF - DROP MVIEW id={}]: {}",
121            i,
122            &format_drop_mview(table)
123        );
124        drop_mview_table(table, client).await;
125        return Ok(());
126    }
127    let n_stream_rows = stream_result.len();
128    let n_batch_rows = batch_result.len();
129    let formatted_stream_rows = format_rows(&batch_result);
130    let formatted_batch_rows = format_rows(&stream_result);
131    tracing::debug!(
132        "[COMPARE - STREAM_FORMATTED_ROW id={}]: {formatted_stream_rows}",
133        i,
134    );
135    tracing::debug!(
136        "[COMPARE - BATCH_FORMATTED_ROW id={}]: {formatted_batch_rows}",
137        i,
138    );
139
140    let diff = TextDiff::from_lines(&formatted_batch_rows, &formatted_stream_rows);
141
142    let diff: String = diff
143        .iter_all_changes()
144        .filter_map(|change| match change.tag() {
145            ChangeTag::Delete => Some(format!("-{}", change)),
146            ChangeTag::Insert => Some(format!("+{}", change)),
147            ChangeTag::Equal => None,
148        })
149        .collect();
150
151    if diff.is_empty() {
152        tracing::info!("[RUN DROP MVIEW id={}]: {}", i, format_drop_mview(table));
153        tracing::info!("[PASSED DIFF id={}, rows_compared={n_stream_rows}]", i);
154
155        drop_mview_table(table, client).await;
156        Ok(())
157    } else {
158        bail!(
159            "
160Different results for batch and stream:
161
162BATCH SQL:
163{batch}
164
165STREAM SQL:
166{stream}
167
168SELECT FROM STREAM SQL:
169{select}
170
171BATCH_ROW_LEN:
172{n_batch_rows}
173
174STREAM_ROW_LEN:
175{n_stream_rows}
176
177BATCH_ROWS:
178{formatted_batch_rows}
179
180STREAM_ROWS:
181{formatted_stream_rows}
182
183ROW DIFF (+/-):
184{diff}
185",
186        )
187    }
188}
189
190/// Format + sort rows so they can be diffed.
191fn format_rows(rows: &[SimpleQueryMessage]) -> String {
192    rows.iter()
193        .filter_map(|r| match r {
194            SimpleQueryMessage::Row(row) => {
195                let n_cols = row.columns().len();
196                let formatted_row: String = (0..n_cols)
197                    .map(|i| {
198                        format!(
199                            "{:#?}",
200                            match row.get(i) {
201                                Some(s) => s,
202                                _ => "NULL",
203                            }
204                        )
205                    })
206                    .join(", ");
207                Some(formatted_row)
208            }
209            SimpleQueryMessage::CommandComplete(_n_rows) => None,
210            _ => unreachable!(),
211        })
212        .sorted()
213        .join("\n")
214}