1use std::collections::HashMap;
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::path::Path;
19use std::process::Stdio;
20use std::sync::Arc;
21
22use anyhow::{Context, anyhow, bail};
23use tokio::io::AsyncWriteExt;
24use tokio::process::Command;
25use tracing::{debug, error, info};
26
27use crate::schedule::TestResult::{Different, Same};
28use crate::{DatabaseMode, FileManager, Opts, Psql, init_env};
29
30#[derive(PartialEq)]
32enum TestResult {
33 Same,
35 Different,
37}
38
39struct TestCase {
40 test_name: String,
41 opts: Opts,
42 psql: Arc<Psql>,
43 file_manager: Arc<FileManager>,
44}
45
46pub(crate) struct Schedule {
47 opts: Opts,
48 file_manager: Arc<FileManager>,
49 psql: Arc<Psql>,
50 schedules: Vec<Vec<String>>,
54}
55
56const PREFIX_IGNORE: &str = "--@ ";
60
61impl Schedule {
62 pub(crate) fn new(opts: Opts) -> anyhow::Result<Self> {
63 Ok(Self {
64 opts: opts.clone(),
65 file_manager: Arc::new(FileManager::new(opts.clone())),
66 psql: Arc::new(Psql::new(opts.clone())),
67 schedules: Schedule::parse_from(opts.schedule_file_path())?,
68 })
69 }
70
71 async fn do_init(self) -> anyhow::Result<Self> {
72 init_env();
73
74 self.file_manager.init()?;
75 self.psql.init().await?;
76
77 Ok(self)
78 }
79
80 fn parse_from<P: AsRef<Path>>(path: P) -> anyhow::Result<Vec<Vec<String>>> {
81 let file = File::options()
82 .read(true)
83 .open(path.as_ref())
84 .with_context(|| format!("Failed to open schedule file: {:?}", path.as_ref()))?;
85
86 let reader = BufReader::new(file);
87 let mut schedules = Vec::new();
88
89 for line in reader.lines() {
90 let line = line?;
91 if line.starts_with("test: ") {
92 schedules.push(
93 line[5..]
94 .split_whitespace()
95 .map(ToString::to_string)
96 .collect(),
97 );
98 debug!("Add one parallel schedule: {:?}", schedules.last().unwrap());
99 }
100 }
101
102 Ok(schedules)
103 }
104
105 pub(crate) async fn run(self) -> anyhow::Result<()> {
112 let s = self.do_init().await?;
113 s.do_run().await
114 }
115
116 async fn do_run(self) -> anyhow::Result<()> {
117 let mut different_tests = Vec::new();
118 for parallel_schedule in &self.schedules {
119 info!("Running parallel schedule: {:?}", parallel_schedule);
120 let ret = self
121 .run_one_schedule(parallel_schedule.iter().map(String::as_str))
122 .await?;
123
124 let mut diff_test = ret
125 .iter()
126 .filter(|(_test_name, test_result)| **test_result == Different)
127 .map(|t| t.0.clone())
128 .collect::<Vec<String>>();
129
130 if !diff_test.is_empty() {
131 error!(
132 "Parallel schedule failed, these tests are different: {:?}",
133 diff_test
134 );
135 different_tests.append(&mut diff_test);
136 } else {
137 info!("Parallel schedule succeeded!");
138 }
139 }
140
141 if !different_tests.is_empty() {
142 info!(
143 "RisingWave regress tests failed, these tests are different from expected output: {:?}",
144 different_tests
145 );
146 bail!(
147 "RisingWave regress tests failed, these tests are different from expected output: {:?}",
148 different_tests
149 )
150 } else {
151 info!("RisingWave regress tests passed.");
152 Ok(())
153 }
154 }
155
156 async fn run_one_schedule(
157 &self,
158 tests: impl Iterator<Item = &str>,
159 ) -> anyhow::Result<HashMap<String, TestResult>> {
160 let mut join_handles = HashMap::new();
161
162 for test_name in tests {
163 let test_case = self.create_test_case(test_name);
164 let join_handle = tokio::spawn(async move { test_case.run().await });
165 join_handles.insert(test_name, join_handle);
166 }
167
168 let mut result = HashMap::new();
169
170 for (test_name, join_handle) in join_handles {
171 let ret = join_handle
172 .await
173 .with_context(|| format!("Running test case {} panicked!", test_name))??;
174
175 result.insert(test_name.to_owned(), ret);
176 }
177
178 Ok(result)
179 }
180
181 fn create_test_case(&self, test_name: &str) -> TestCase {
182 TestCase {
183 test_name: test_name.to_owned(),
184 opts: self.opts.clone(),
185 psql: self.psql.clone(),
186 file_manager: self.file_manager.clone(),
187 }
188 }
189}
190
191impl TestCase {
192 async fn run(self) -> anyhow::Result<TestResult> {
193 let host = &self.opts.host();
194 let port = &self.opts.port().to_string();
195 let database_name = self.opts.database_name();
196 let pg_user_name = self.opts.pg_user_name();
197 let args: Vec<&str> = vec![
198 "-X",
199 "-a",
200 "-q",
201 "-h",
202 host,
203 "-p",
204 port,
205 "-d",
206 database_name,
207 "-U",
208 pg_user_name,
209 "-v",
210 "HIDE_TABLEAM=on",
211 "-v",
212 "HIDE_TOAST_COMPRESSION=on",
213 ];
214 println!(
215 "Ready to run command:\npsql {}\n for test case:{}",
216 args.join(" "),
217 self.test_name
218 );
219
220 let extra_lines_added_to_input = match self.opts.database_mode() {
221 DatabaseMode::Risingwave => {
222 vec![
223 "SET RW_IMPLICIT_FLUSH TO true;\n",
224 "SET QUERY_MODE TO LOCAL;\n",
225 ]
226 }
227 DatabaseMode::Postgres => vec![],
228 };
229
230 let actual_output_path = self.file_manager.output_of(&self.test_name)?;
231 let actual_output_file = File::options()
232 .create_new(true)
233 .write(true)
234 .open(&actual_output_path)
235 .with_context(|| {
236 format!(
237 "Failed to create {:?} for writing output.",
238 actual_output_path
239 )
240 })?;
241
242 let mut command = Command::new("psql");
243 command.env(
244 "PGAPPNAME",
245 format!("risingwave_regress/{}", self.test_name),
246 );
247 command.args(args);
248 info!(
249 "Starting to execute test case: {}, command: {:?}",
250 self.test_name, command
251 );
252 let mut child = command
253 .stdin(Stdio::piped())
254 .stdout(actual_output_file.try_clone().with_context(|| {
255 format!("Failed to clone output file: {:?}", actual_output_path)
256 })?)
257 .stderr(actual_output_file)
258 .spawn()
259 .with_context(|| format!("Failed to spawn child for test case: {}", self.test_name))?;
260
261 let child_stdin = child
262 .stdin
263 .as_mut()
264 .ok_or_else(|| anyhow!("Cannot get the stdin handle of the child process."))?;
265 for extra_line in &extra_lines_added_to_input {
266 child_stdin.write_all(extra_line.as_bytes()).await?;
267 }
268
269 let read_all_lines_from = std::fs::read_to_string;
270
271 let input_path = self.file_manager.source_of(&self.test_name)?;
272 let input_file_content = read_all_lines_from(input_path)?;
273 info!("input_file_content:{}", input_file_content);
274 child_stdin.write_all(input_file_content.as_bytes()).await?;
275
276 let status = child.wait().await.with_context(|| {
277 format!("Failed to wait for finishing test case: {}", self.test_name)
278 })?;
279
280 if !status.success() {
281 let error_output = read_all_lines_from(actual_output_path)?;
282 let error_msg = format!(
283 "Execution of test case {} failed, reason:\n{}",
284 self.test_name, error_output
285 );
286 error!("{}", error_msg);
287 bail!(error_msg);
288 }
289
290 let expected_output_path = self.file_manager.expected_output_of(&self.test_name)?;
291
292 let input_lines = input_file_content
293 .lines()
294 .filter(|s| !s.is_empty() && *s != PREFIX_IGNORE);
295 let mut expected_lines = std::io::BufReader::new(File::open(expected_output_path)?)
296 .lines()
297 .map(|s| s.unwrap())
298 .filter(|s| !s.is_empty());
299 let mut actual_lines = std::io::BufReader::new(File::open(actual_output_path)?)
300 .lines()
301 .skip(extra_lines_added_to_input.len())
302 .map(|s| s.unwrap())
303 .filter(|s| !s.is_empty() && s != PREFIX_IGNORE);
304
305 let mut is_diff = false;
326 let mut pending_input = vec![];
327 for input_line in input_lines {
328 let original_input_line = input_line.strip_prefix(PREFIX_IGNORE).unwrap_or(input_line);
329
330 let mut expected_output = vec![];
332 while let Some(line) = expected_lines.next()
333 && line != original_input_line
334 {
335 expected_output.push(line);
336 }
337
338 let mut actual_output = vec![];
339 while let Some(line) = actual_lines.next()
340 && line != input_line
341 {
342 actual_output.push(line);
343 }
344
345 if expected_output.is_empty() && actual_output.is_empty() {
348 pending_input.push(input_line);
349 continue;
350 }
351
352 let query_input = std::mem::replace(&mut pending_input, vec![input_line]);
353
354 is_diff = !compare_output(&query_input, &expected_output, &actual_output) || is_diff;
355 }
356 let expected_output: Vec<_> = expected_lines.collect();
358 let actual_output: Vec<_> = actual_lines.collect();
359 is_diff = !compare_output(&pending_input, &expected_output, &actual_output) || is_diff;
360
361 Ok(if is_diff { Different } else { Same })
362 }
363}
364
365fn compare_output(query: &[&str], expected: &[String], actual: &[String]) -> bool {
366 let compare_lines = |expected: &[String], actual: &[String]| {
367 let eq = expected == actual;
368 if !eq {
369 error!("query input:\n{}", query.join("\n"));
370
371 let (expected_output, actual_output) = (expected.join("\n"), actual.join("\n"));
372 let diffs = format_diff(&expected_output, &actual_output);
373 error!("Diff:\n{}", diffs);
374 }
375 eq
376 };
377
378 if let Some(l) = query.last()
379 && l.starts_with(PREFIX_IGNORE)
380 {
381 return true;
382 }
383 if !expected.is_empty()
384 && !actual.is_empty()
385 && expected[0].starts_with("ERROR: ")
386 && actual[0].starts_with("ERROR: ")
387 {
388 return true;
389 }
390
391 let is_select = query
392 .iter()
393 .any(|line| line.to_lowercase().starts_with("select"));
394 if !is_select {
395 return compare_lines(expected, actual);
397 }
398
399 let is_order_by = query
403 .iter()
404 .any(|line| line.to_lowercase().contains("order by"));
405
406 if expected.len() < 3
419 || !expected[1].starts_with("---")
420 || !matches!(expected.last(), Some(l) if l.ends_with(" rows)") || l == "(1 row)")
421 || actual.len() < 3
422 || !actual[1].starts_with("---")
423 || !matches!(actual.last(), Some(l) if l.ends_with(" rows)") || l == "(1 row)")
424 {
425 return compare_lines(expected, actual);
427 }
428
429 if is_order_by {
430 compare_lines(expected, actual)
431 } else {
432 let mut expected_sorted = expected.to_vec();
433 expected_sorted[2..expected.len() - 1].sort();
434 let mut actual_sorted = actual.to_vec();
435 actual_sorted[2..actual.len() - 1].sort();
436
437 compare_lines(&expected_sorted, &actual_sorted)
438 }
439}
440
441fn format_diff(expected_output: &String, actual_output: &String) -> String {
442 use std::fmt::Write;
443
444 use similar::{ChangeTag, TextDiff};
445 let diff = TextDiff::from_lines(expected_output, actual_output);
446
447 let mut diff_str = "".to_owned();
448 for change in diff.iter_all_changes() {
449 let sign = match change.tag() {
450 ChangeTag::Delete => "-",
451 ChangeTag::Insert => "+",
452 ChangeTag::Equal => " ",
453 };
454 write!(diff_str, "{}{}", sign, change).unwrap();
455 }
456 diff_str
457}