risingwave_regress_test/
schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::path::Path;
19use std::process::Stdio;
20use std::sync::Arc;
21
22use anyhow::{Context, anyhow, bail};
23use tokio::io::AsyncWriteExt;
24use tokio::process::Command;
25use tracing::{debug, error, info};
26
27use crate::schedule::TestResult::{Different, Same};
28use crate::{DatabaseMode, FileManager, Opts, Psql, init_env};
29
30/// Result of each test case.
31#[derive(PartialEq)]
32enum TestResult {
33    /// Execution of the test case succeeded, and results are same.
34    Same,
35    /// Execution of the test case succeeded, but outputs are different from expected result.
36    Different,
37}
38
39struct TestCase {
40    test_name: String,
41    opts: Opts,
42    psql: Arc<Psql>,
43    file_manager: Arc<FileManager>,
44}
45
46pub(crate) struct Schedule {
47    opts: Opts,
48    file_manager: Arc<FileManager>,
49    psql: Arc<Psql>,
50    /// Schedules of test names.
51    ///
52    /// Each item is called a parallel schedule, which runs parallel.
53    schedules: Vec<Vec<String>>,
54}
55
56// Test queries commented out with `--@ ` are ignored for comparison.
57// Unlike normal comment `--`, this does not require modification to expected output file.
58// We can simplify toggle the case on/off by just updating the input sql file.
59const PREFIX_IGNORE: &str = "--@ ";
60
61impl Schedule {
62    pub(crate) fn new(opts: Opts) -> anyhow::Result<Self> {
63        Ok(Self {
64            opts: opts.clone(),
65            file_manager: Arc::new(FileManager::new(opts.clone())),
66            psql: Arc::new(Psql::new(opts.clone())),
67            schedules: Schedule::parse_from(opts.schedule_file_path())?,
68        })
69    }
70
71    async fn do_init(self) -> anyhow::Result<Self> {
72        init_env();
73
74        self.file_manager.init()?;
75        self.psql.init().await?;
76
77        Ok(self)
78    }
79
80    fn parse_from<P: AsRef<Path>>(path: P) -> anyhow::Result<Vec<Vec<String>>> {
81        let file = File::options()
82            .read(true)
83            .open(path.as_ref())
84            .with_context(|| format!("Failed to open schedule file: {:?}", path.as_ref()))?;
85
86        let reader = BufReader::new(file);
87        let mut schedules = Vec::new();
88
89        for line in reader.lines() {
90            let line = line?;
91            if line.starts_with("test: ") {
92                schedules.push(
93                    line[5..]
94                        .split_whitespace()
95                        .map(ToString::to_string)
96                        .collect(),
97                );
98                debug!("Add one parallel schedule: {:?}", schedules.last().unwrap());
99            }
100        }
101
102        Ok(schedules)
103    }
104
105    /// Run all test schedules.
106    ///
107    /// # Returns
108    ///
109    /// `Ok` If no error happens and all outputs are expected,
110    /// `Err` If any error happens, or some outputs are unexpected. Details are logged in log file.
111    pub(crate) async fn run(self) -> anyhow::Result<()> {
112        let s = self.do_init().await?;
113        s.do_run().await
114    }
115
116    async fn do_run(self) -> anyhow::Result<()> {
117        let mut different_tests = Vec::new();
118        for parallel_schedule in &self.schedules {
119            info!("Running parallel schedule: {:?}", parallel_schedule);
120            let ret = self
121                .run_one_schedule(parallel_schedule.iter().map(String::as_str))
122                .await?;
123
124            let mut diff_test = ret
125                .iter()
126                .filter(|(_test_name, test_result)| **test_result == Different)
127                .map(|t| t.0.clone())
128                .collect::<Vec<String>>();
129
130            if !diff_test.is_empty() {
131                error!(
132                    "Parallel schedule failed, these tests are different: {:?}",
133                    diff_test
134                );
135                different_tests.append(&mut diff_test);
136            } else {
137                info!("Parallel schedule succeeded!");
138            }
139        }
140
141        if !different_tests.is_empty() {
142            info!(
143                "RisingWave regress tests failed, these tests are different from expected output: {:?}",
144                different_tests
145            );
146            bail!(
147                "RisingWave regress tests failed, these tests are different from expected output: {:?}",
148                different_tests
149            )
150        } else {
151            info!("RisingWave regress tests passed.");
152            Ok(())
153        }
154    }
155
156    async fn run_one_schedule(
157        &self,
158        tests: impl Iterator<Item = &str>,
159    ) -> anyhow::Result<HashMap<String, TestResult>> {
160        let mut join_handles = HashMap::new();
161
162        for test_name in tests {
163            let test_case = self.create_test_case(test_name);
164            let join_handle = tokio::spawn(async move { test_case.run().await });
165            join_handles.insert(test_name, join_handle);
166        }
167
168        let mut result = HashMap::new();
169
170        for (test_name, join_handle) in join_handles {
171            let ret = join_handle
172                .await
173                .with_context(|| format!("Running test case {} panicked!", test_name))??;
174
175            result.insert(test_name.to_owned(), ret);
176        }
177
178        Ok(result)
179    }
180
181    fn create_test_case(&self, test_name: &str) -> TestCase {
182        TestCase {
183            test_name: test_name.to_owned(),
184            opts: self.opts.clone(),
185            psql: self.psql.clone(),
186            file_manager: self.file_manager.clone(),
187        }
188    }
189}
190
191impl TestCase {
192    async fn run(self) -> anyhow::Result<TestResult> {
193        let host = &self.opts.host();
194        let port = &self.opts.port().to_string();
195        let database_name = self.opts.database_name();
196        let pg_user_name = self.opts.pg_user_name();
197        let args: Vec<&str> = vec![
198            "-X",
199            "-a",
200            "-q",
201            "-h",
202            host,
203            "-p",
204            port,
205            "-d",
206            database_name,
207            "-U",
208            pg_user_name,
209            "-v",
210            "HIDE_TABLEAM=on",
211            "-v",
212            "HIDE_TOAST_COMPRESSION=on",
213        ];
214        println!(
215            "Ready to run command:\npsql {}\n for test case:{}",
216            args.join(" "),
217            self.test_name
218        );
219
220        let extra_lines_added_to_input = match self.opts.database_mode() {
221            DatabaseMode::Risingwave => {
222                vec![
223                    "SET RW_IMPLICIT_FLUSH TO true;\n",
224                    "SET QUERY_MODE TO LOCAL;\n",
225                ]
226            }
227            DatabaseMode::Postgres => vec![],
228        };
229
230        let actual_output_path = self.file_manager.output_of(&self.test_name)?;
231        let actual_output_file = File::options()
232            .create_new(true)
233            .write(true)
234            .open(&actual_output_path)
235            .with_context(|| {
236                format!(
237                    "Failed to create {:?} for writing output.",
238                    actual_output_path
239                )
240            })?;
241
242        let mut command = Command::new("psql");
243        command.env(
244            "PGAPPNAME",
245            format!("risingwave_regress/{}", self.test_name),
246        );
247        command.args(args);
248        info!(
249            "Starting to execute test case: {}, command: {:?}",
250            self.test_name, command
251        );
252        let mut child = command
253            .stdin(Stdio::piped())
254            .stdout(actual_output_file.try_clone().with_context(|| {
255                format!("Failed to clone output file: {:?}", actual_output_path)
256            })?)
257            .stderr(actual_output_file)
258            .spawn()
259            .with_context(|| format!("Failed to spawn child for test case: {}", self.test_name))?;
260
261        let child_stdin = child
262            .stdin
263            .as_mut()
264            .ok_or_else(|| anyhow!("Cannot get the stdin handle of the child process."))?;
265        for extra_line in &extra_lines_added_to_input {
266            child_stdin.write_all(extra_line.as_bytes()).await?;
267        }
268
269        let read_all_lines_from = std::fs::read_to_string;
270
271        let input_path = self.file_manager.source_of(&self.test_name)?;
272        let input_file_content = read_all_lines_from(input_path)?;
273        info!("input_file_content:{}", input_file_content);
274        child_stdin.write_all(input_file_content.as_bytes()).await?;
275
276        let status = child.wait().await.with_context(|| {
277            format!("Failed to wait for finishing test case: {}", self.test_name)
278        })?;
279
280        if !status.success() {
281            let error_output = read_all_lines_from(actual_output_path)?;
282            let error_msg = format!(
283                "Execution of test case {} failed, reason:\n{}",
284                self.test_name, error_output
285            );
286            error!("{}", error_msg);
287            bail!(error_msg);
288        }
289
290        let expected_output_path = self.file_manager.expected_output_of(&self.test_name)?;
291
292        let input_lines = input_file_content
293            .lines()
294            .filter(|s| !s.is_empty() && *s != PREFIX_IGNORE);
295        let mut expected_lines = std::io::BufReader::new(File::open(expected_output_path)?)
296            .lines()
297            .map(|s| s.unwrap())
298            .filter(|s| !s.is_empty());
299        let mut actual_lines = std::io::BufReader::new(File::open(actual_output_path)?)
300            .lines()
301            .skip(extra_lines_added_to_input.len())
302            .map(|s| s.unwrap())
303            .filter(|s| !s.is_empty() && s != PREFIX_IGNORE);
304
305        // We split the output lines (either expected or actual) based on matching lines from input.
306        // For example:
307        //     input      |    output
308        // --+------------+--+-----------------------------------
309        //  1| select v1  | 1| select v1
310        //  2| from t;    | 2| from t;
311        //   |            | 3|  v1
312        //   |            | 4| ----
313        //   |            | 5|   1
314        //   |            | 6|   2
315        //   |            | 7| (2 rows)
316        //   |            | 8|
317        //  3| select v1; | 9| select v1;
318        //   |            |10| ERROR:  column "v1" does not exist
319        //   |            |11| LINE 1: select v1;
320        //   |            |12|                ^
321        //
322        // We would split the output lines into 2 chunks:
323        // * query 1..=2 and output  3..= 7
324        // * query 9..=9 and output 10..=12
325        let mut is_diff = false;
326        let mut pending_input = vec![];
327        for input_line in input_lines {
328            let original_input_line = input_line.strip_prefix(PREFIX_IGNORE).unwrap_or(input_line);
329
330            // Find the matching output line, and collect lines before the next matching line.
331            let mut expected_output = vec![];
332            while let Some(line) = expected_lines.next()
333                && line != original_input_line
334            {
335                expected_output.push(line);
336            }
337
338            let mut actual_output = vec![];
339            while let Some(line) = actual_lines.next()
340                && line != input_line
341            {
342                actual_output.push(line);
343            }
344
345            // If no unmatched lines skipped, this input line belongs to the same query as previous
346            // lines.
347            if expected_output.is_empty() && actual_output.is_empty() {
348                pending_input.push(input_line);
349                continue;
350            }
351
352            let query_input = std::mem::replace(&mut pending_input, vec![input_line]);
353
354            is_diff = !compare_output(&query_input, &expected_output, &actual_output) || is_diff;
355        }
356        // There may be more lines after the final matching lines.
357        let expected_output: Vec<_> = expected_lines.collect();
358        let actual_output: Vec<_> = actual_lines.collect();
359        is_diff = !compare_output(&pending_input, &expected_output, &actual_output) || is_diff;
360
361        Ok(if is_diff { Different } else { Same })
362    }
363}
364
365fn compare_output(query: &[&str], expected: &[String], actual: &[String]) -> bool {
366    let compare_lines = |expected: &[String], actual: &[String]| {
367        let eq = expected == actual;
368        if !eq {
369            error!("query input:\n{}", query.join("\n"));
370
371            let (expected_output, actual_output) = (expected.join("\n"), actual.join("\n"));
372            let diffs = format_diff(&expected_output, &actual_output);
373            error!("Diff:\n{}", diffs);
374        }
375        eq
376    };
377
378    if let Some(l) = query.last()
379        && l.starts_with(PREFIX_IGNORE)
380    {
381        return true;
382    }
383    if !expected.is_empty()
384        && !actual.is_empty()
385        && expected[0].starts_with("ERROR:  ")
386        && actual[0].starts_with("ERROR:  ")
387    {
388        return true;
389    }
390
391    let is_select = query
392        .iter()
393        .any(|line| line.to_lowercase().starts_with("select"));
394    if !is_select {
395        // no special handling when we do not recognize it
396        return compare_lines(expected, actual);
397    }
398
399    // This could fail in the corner case.
400    // For example, a subquery has a `order by` clause, which should not be allowed and
401    // is meaningless as only the outermost `order by` clause is effective.
402    let is_order_by = query
403        .iter()
404        .any(|line| line.to_lowercase().contains("order by"));
405
406    // The output of `select` is assumed to have the following format:
407    //
408    // line of output column names
409    // line of separator starting with "---" (comment assumed to start with only two "--".)
410    // x lines of output rows
411    // line of summary "(x rows)"
412    //
413    // Note that zero column output (`select;`) has no title line and only 2 hyphens. This would not
414    // be recognized and thus compared as-is without special treatment, which is expected.
415    // --
416    // (x rows)
417
418    if expected.len() < 3
419        || !expected[1].starts_with("---")
420        || !matches!(expected.last(), Some(l) if l.ends_with(" rows)") || l == "(1 row)")
421        || actual.len() < 3
422        || !actual[1].starts_with("---")
423        || !matches!(actual.last(), Some(l) if l.ends_with(" rows)") || l == "(1 row)")
424    {
425        // no special handling when we do not recognize it
426        return compare_lines(expected, actual);
427    }
428
429    if is_order_by {
430        compare_lines(expected, actual)
431    } else {
432        let mut expected_sorted = expected.to_vec();
433        expected_sorted[2..expected.len() - 1].sort();
434        let mut actual_sorted = actual.to_vec();
435        actual_sorted[2..actual.len() - 1].sort();
436
437        compare_lines(&expected_sorted, &actual_sorted)
438    }
439}
440
441fn format_diff(expected_output: &String, actual_output: &String) -> String {
442    use std::fmt::Write;
443
444    use similar::{ChangeTag, TextDiff};
445    let diff = TextDiff::from_lines(expected_output, actual_output);
446
447    let mut diff_str = "".to_owned();
448    for change in diff.iter_all_changes() {
449        let sign = match change.tag() {
450            ChangeTag::Delete => "-",
451            ChangeTag::Insert => "+",
452            ChangeTag::Equal => " ",
453        };
454        write!(diff_str, "{}{}", sign, change).unwrap();
455    }
456    diff_str
457}