risingwave_sqlsmith/sqlreduce/
checker.rs1use std::fs;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18use std::time::{Duration, SystemTime};
19
20use risingwave_sqlparser::ast::Statement;
21use tokio_postgres::{Client, NoTls};
22
23#[derive(Debug, Clone)]
25struct PanicInfo {
26 component: String,
27 #[allow(dead_code)]
28 message: String,
29}
30
31pub struct Checker {
33 pub client: Client,
34 pub setup_stmts: Vec<Statement>,
35 restore_cmd: String,
36}
37
38impl Checker {
39 pub fn new(client: Client, setup_stmts: Vec<Statement>, restore_cmd: String) -> Self {
40 Self {
41 client,
42 setup_stmts,
43 restore_cmd,
44 }
45 }
46
47 pub async fn prepare_schema(&self) {
52 let _ = self
53 .client
54 .simple_query("CREATE SCHEMA IF NOT EXISTS sqlsmith_reducer;")
55 .await;
56 let _ = self
57 .client
58 .simple_query("SET search_path TO sqlsmith_reducer;")
59 .await;
60 }
61
62 pub async fn drop_schema(&self) {
66 let _ = self
67 .client
68 .simple_query("DROP SCHEMA IF EXISTS sqlsmith_reducer CASCADE;")
69 .await;
70 }
71
72 pub async fn is_failure_preserved(&mut self, old: &str, new: &str) -> bool {
76 self.reset_schema().await;
77 self.replay_setup().await;
78 let old_result = run_query(&mut self.client, old, &self.restore_cmd).await;
79
80 self.reset_schema().await;
81 self.replay_setup().await;
82 let new_result = run_query(&mut self.client, new, &self.restore_cmd).await;
83
84 tracing::debug!("old_result: {:?}", old_result);
85 tracing::debug!("new_result: {:?}", new_result);
86
87 old_result == new_result
88 }
89
90 async fn reset_schema(&self) {
92 let _ = self
93 .client
94 .simple_query("DROP SCHEMA IF EXISTS sqlsmith_reducer CASCADE;")
95 .await;
96 let _ = self
97 .client
98 .simple_query("CREATE SCHEMA sqlsmith_reducer;")
99 .await;
100 let _ = self
101 .client
102 .simple_query("SET search_path TO sqlsmith_reducer;")
103 .await;
104 }
105
106 async fn replay_setup(&self) {
108 for stmt in &self.setup_stmts {
109 let _ = self.client.simple_query(&stmt.to_string()).await;
110 }
111 }
112}
113
114pub async fn run_query(client: &mut Client, query: &str, restore_cmd: &str) -> (bool, String) {
116 let query_start_time = SystemTime::now();
117
118 match client.simple_query(query).await {
119 Ok(_) => (true, String::new()),
120 Err(e) => {
121 let error_msg = e.to_string();
122 tracing::debug!("Query failed with error: {}", error_msg);
123
124 if is_frontend_error(&error_msg) {
127 tracing::debug!("Detected frontend error, attempting auto-recovery");
128
129 if let Ok((new_client, connection)) = reconnect_frontend().await {
131 tokio::spawn(async move {
132 if let Err(e) = connection.await {
133 tracing::error!("connection error: {}", e);
134 }
135 });
136 *client = new_client;
137 tracing::info!("Reconnected to frontend after error");
138
139 match wait_for_auto_recovery(client, 30).await {
141 Ok(_) => {
142 tracing::info!("Frontend auto-recovery successful");
143 }
144 Err(_) => {
145 tracing::warn!(
147 "Frontend auto-recovery timeout, falling back to bootstrap"
148 );
149 if let Err(err) = bootstrap_recovery(restore_cmd) {
150 tracing::error!("Bootstrap recovery failed: {}", err);
151 }
152 }
153 }
154 } else {
155 tracing::error!("Failed to reconnect to frontend");
156 }
157
158 return (false, error_msg);
159 }
160
161 if client.is_closed() {
163 tracing::warn!("Connection lost, checking for component panics");
164
165 let time_window = query_start_time
168 .checked_sub(Duration::from_secs(10))
169 .unwrap_or(query_start_time);
170
171 if let Some(panic_info) = check_component_panic(time_window) {
172 tracing::error!(
174 "Panic detected in {} component, executing bootstrap recovery",
175 panic_info.component
176 );
177
178 if let Err(err) = bootstrap_recovery(restore_cmd) {
179 tracing::error!("Bootstrap recovery failed: {}", err);
180 }
181
182 if let Ok((new_client, connection)) = reconnect_frontend().await {
184 tokio::spawn(async move {
185 if let Err(e) = connection.await {
186 tracing::error!("connection error: {}", e);
187 }
188 });
189 *client = new_client;
190 tracing::info!("Reconnected after bootstrap recovery");
191
192 if let Err(err) = wait_for_recovery(client).await {
194 tracing::error!("Failed to complete recovery: {}", err);
195 }
196 }
197 } else {
198 tracing::warn!(
201 "No component panic detected, but connection lost - using bootstrap recovery"
202 );
203
204 if let Err(err) = bootstrap_recovery(restore_cmd) {
205 tracing::error!("Bootstrap recovery failed: {}", err);
206 }
207
208 if let Ok((new_client, connection)) = reconnect_frontend().await {
209 tokio::spawn(async move {
210 if let Err(e) = connection.await {
211 tracing::error!("connection error: {}", e);
212 }
213 });
214 *client = new_client;
215 tracing::info!("Reconnected after bootstrap recovery");
216
217 if let Err(err) = wait_for_recovery(client).await {
219 tracing::error!("Failed to complete recovery: {}", err);
220 }
221 }
222 }
223 } else {
224 tracing::debug!("Connection alive, treating as normal query error");
226 }
227
228 (false, error_msg)
229 }
230 }
231}
232
233async fn reconnect_frontend() -> Result<
235 (
236 Client,
237 tokio_postgres::Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>,
238 ),
239 tokio_postgres::Error,
240> {
241 tokio_postgres::Config::new()
242 .host("localhost")
243 .port(4566)
244 .dbname("dev")
245 .user("root")
246 .password("")
247 .connect_timeout(Duration::from_secs(60))
248 .connect(NoTls)
249 .await
250}
251
252pub async fn wait_for_recovery(client: &Client) -> anyhow::Result<()> {
254 let timeout = Duration::from_secs(300);
255 let mut interval = tokio::time::interval(Duration::from_millis(100));
256
257 let res: Result<(), anyhow::Error> = tokio::time::timeout(timeout, async {
258 loop {
259 let query_res = client.simple_query("select rw_recovery_status();").await;
260 if let Ok(messages) = query_res {
261 for msg in messages {
262 if let tokio_postgres::SimpleQueryMessage::Row(row) = msg
263 && let Some(status) = row.get(0)
264 && status == "RUNNING"
265 {
266 return Ok(());
267 }
268 }
269 }
270 interval.tick().await;
271 }
272 })
273 .await
274 .map_err(|_| anyhow::anyhow!("timed out waiting for recovery"))?;
275
276 res
277}
278
279fn is_frontend_error(error_msg: &str) -> bool {
282 error_msg.contains("syntax error")
283 || error_msg.contains("parse error")
284 || error_msg.contains("Bind error")
285 || error_msg.contains("Planner error")
286 || error_msg.contains("Catalog error")
287 || error_msg.contains("type mismatch")
288 || error_msg.contains("not found")
289 || error_msg.contains("already exists")
290 || error_msg.contains("permission denied")
291}
292
293fn get_log_dir() -> PathBuf {
295 PathBuf::from(".risingwave/log")
296}
297
298fn check_component_panic(since: SystemTime) -> Option<PanicInfo> {
303 let log_dir = get_log_dir();
304 if !log_dir.exists() {
305 tracing::warn!("Log directory not found: {}", log_dir.display());
306 return None;
307 }
308
309 let entries = match fs::read_dir(&log_dir) {
311 Ok(entries) => entries,
312 Err(e) => {
313 tracing::warn!("Failed to read log directory: {}", e);
314 return None;
315 }
316 };
317
318 for entry in entries.flatten() {
319 let path = entry.path();
320 let filename = match path.file_name().and_then(|n| n.to_str()) {
321 Some(name) => name,
322 None => continue,
323 };
324
325 let component = if filename.starts_with("compute-node-") {
327 "compute"
328 } else if filename.starts_with("meta-node-") {
329 "meta"
330 } else if filename.starts_with("compactor-") {
331 "compactor"
332 } else {
333 continue; };
335
336 if let Ok(metadata) = fs::metadata(&path)
338 && let Ok(modified) = metadata.modified()
339 && modified < since
340 {
341 continue; }
343
344 if let Some(panic_msg) = search_panic_in_log(&path) {
346 tracing::error!("Detected panic in {} component: {}", component, panic_msg);
347 return Some(PanicInfo {
348 component: component.to_owned(),
349 message: panic_msg,
350 });
351 }
352 }
353
354 None
355}
356
357fn search_panic_in_log(log_path: &Path) -> Option<String> {
364 let output = Command::new("tail")
366 .arg("-n")
367 .arg("500")
368 .arg(log_path)
369 .output()
370 .ok()?;
371
372 let content = String::from_utf8_lossy(&output.stdout);
373
374 for line in content.lines() {
376 let line_lower = line.to_lowercase();
377 if line_lower.contains("thread") && line_lower.contains("panicked") {
378 return Some(line.to_owned());
379 }
380 if line_lower.contains("panic")
381 && (line_lower.contains("error") || line_lower.contains("fatal"))
382 {
383 return Some(line.to_owned());
384 }
385 }
386
387 None
388}
389
390fn bootstrap_recovery(restore_cmd: &str) -> anyhow::Result<()> {
393 tracing::warn!("Executing bootstrap recovery due to component panic");
394
395 let status = Command::new("sh").arg("-c").arg(restore_cmd).status()?;
396
397 if status.success() {
398 tracing::info!("Bootstrap recovery completed successfully");
399 Ok(())
400 } else {
401 Err(anyhow::anyhow!(
402 "Bootstrap recovery failed with status: {}",
403 status
404 ))
405 }
406}
407
408async fn wait_for_auto_recovery(client: &Client, timeout_secs: u64) -> anyhow::Result<()> {
411 let timeout = Duration::from_secs(timeout_secs);
412 let mut interval = tokio::time::interval(Duration::from_millis(100));
413
414 tokio::time::timeout(timeout, async {
415 loop {
416 let query_res = client.simple_query("select rw_recovery_status();").await;
417 if let Ok(messages) = query_res {
418 for msg in messages {
419 if let tokio_postgres::SimpleQueryMessage::Row(row) = msg
420 && let Some(status) = row.get(0)
421 && status == "RUNNING"
422 {
423 return Ok(());
424 }
425 }
426 }
427 interval.tick().await;
428 }
429 })
430 .await
431 .map_err(|_| anyhow::anyhow!("Frontend auto-recovery timeout after {}s", timeout_secs))?
432}