risingwave_sqlsmith/sqlreduce/
checker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fs;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18use std::time::{Duration, SystemTime};
19
20use risingwave_sqlparser::ast::Statement;
21use tokio_postgres::{Client, NoTls};
22
23/// Information about a detected panic in component logs.
24#[derive(Debug, Clone)]
25struct PanicInfo {
26    component: String,
27    #[allow(dead_code)]
28    message: String,
29}
30
31/// Checker evaluates whether a transformed SQL still preserves the original failure.
32pub struct Checker {
33    pub client: Client,
34    pub setup_stmts: Vec<Statement>,
35    restore_cmd: String,
36}
37
38impl Checker {
39    pub fn new(client: Client, setup_stmts: Vec<Statement>, restore_cmd: String) -> Self {
40        Self {
41            client,
42            setup_stmts,
43            restore_cmd,
44        }
45    }
46
47    /// Prepares the schema namespace for testing.
48    ///
49    /// This creates the `sqlsmith_reducer` schema and sets the search path.
50    /// Should be called once before any reduction begins.
51    pub async fn prepare_schema(&self) {
52        let _ = self
53            .client
54            .simple_query("CREATE SCHEMA IF NOT EXISTS sqlsmith_reducer;")
55            .await;
56        let _ = self
57            .client
58            .simple_query("SET search_path TO sqlsmith_reducer;")
59            .await;
60    }
61
62    /// Drop the schema.
63    ///
64    /// Should be called after the reduction is complete.
65    pub async fn drop_schema(&self) {
66        let _ = self
67            .client
68            .simple_query("DROP SCHEMA IF EXISTS sqlsmith_reducer CASCADE;")
69            .await;
70    }
71
72    /// Determines if the transformation preserved the original failure behavior.
73    ///
74    /// Each test run resets the schema, replays setup, and runs the query.
75    pub async fn is_failure_preserved(&mut self, old: &str, new: &str) -> bool {
76        self.reset_schema().await;
77        self.replay_setup().await;
78        let old_result = run_query(&mut self.client, old, &self.restore_cmd).await;
79
80        self.reset_schema().await;
81        self.replay_setup().await;
82        let new_result = run_query(&mut self.client, new, &self.restore_cmd).await;
83
84        tracing::debug!("old_result: {:?}", old_result);
85        tracing::debug!("new_result: {:?}", new_result);
86
87        old_result == new_result
88    }
89
90    /// Drops the entire schema to reset database state.
91    async fn reset_schema(&self) {
92        let _ = self
93            .client
94            .simple_query("DROP SCHEMA IF EXISTS sqlsmith_reducer CASCADE;")
95            .await;
96        let _ = self
97            .client
98            .simple_query("CREATE SCHEMA sqlsmith_reducer;")
99            .await;
100        let _ = self
101            .client
102            .simple_query("SET search_path TO sqlsmith_reducer;")
103            .await;
104    }
105
106    /// Replays the setup statements (DDL, inserts, etc.)
107    async fn replay_setup(&self) {
108        for stmt in &self.setup_stmts {
109            let _ = self.client.simple_query(&stmt.to_string()).await;
110        }
111    }
112}
113
114/// Executes a single SQL query string and returns (`is_ok`, `error_message_if_any`)
115pub async fn run_query(client: &mut Client, query: &str, restore_cmd: &str) -> (bool, String) {
116    let query_start_time = SystemTime::now();
117
118    match client.simple_query(query).await {
119        Ok(_) => (true, String::new()),
120        Err(e) => {
121            let error_msg = e.to_string();
122            tracing::debug!("Query failed with error: {}", error_msg);
123
124            // Step 1: Check if this is a frontend-only error (SQL syntax, type errors, etc.)
125            // These are safe and can be handled by auto-recovery
126            if is_frontend_error(&error_msg) {
127                tracing::debug!("Detected frontend error, attempting auto-recovery");
128
129                // Try reconnecting to frontend
130                if let Ok((new_client, connection)) = reconnect_frontend().await {
131                    tokio::spawn(async move {
132                        if let Err(e) = connection.await {
133                            tracing::error!("connection error: {}", e);
134                        }
135                    });
136                    *client = new_client;
137                    tracing::info!("Reconnected to frontend after error");
138
139                    // Wait for auto-recovery with 30s timeout
140                    match wait_for_auto_recovery(client, 30).await {
141                        Ok(_) => {
142                            tracing::info!("Frontend auto-recovery successful");
143                        }
144                        Err(_) => {
145                            // Auto-recovery stuck, fallback to bootstrap
146                            tracing::warn!(
147                                "Frontend auto-recovery timeout, falling back to bootstrap"
148                            );
149                            if let Err(err) = bootstrap_recovery(restore_cmd) {
150                                tracing::error!("Bootstrap recovery failed: {}", err);
151                            }
152                        }
153                    }
154                } else {
155                    tracing::error!("Failed to reconnect to frontend");
156                }
157
158                return (false, error_msg);
159            }
160
161            // Step 2: Check if connection is lost (possible panic)
162            if client.is_closed() {
163                tracing::warn!("Connection lost, checking for component panics");
164
165                // Step 3: Check logs for panic in compute/meta/compactor
166                // Look for panics that occurred around the query time (within 10s window)
167                let time_window = query_start_time
168                    .checked_sub(Duration::from_secs(10))
169                    .unwrap_or(query_start_time);
170
171                if let Some(panic_info) = check_component_panic(time_window) {
172                    // Critical component panic detected - need bootstrap recovery
173                    tracing::error!(
174                        "Panic detected in {} component, executing bootstrap recovery",
175                        panic_info.component
176                    );
177
178                    if let Err(err) = bootstrap_recovery(restore_cmd) {
179                        tracing::error!("Bootstrap recovery failed: {}", err);
180                    }
181
182                    // Reconnect after bootstrap recovery
183                    if let Ok((new_client, connection)) = reconnect_frontend().await {
184                        tokio::spawn(async move {
185                            if let Err(e) = connection.await {
186                                tracing::error!("connection error: {}", e);
187                            }
188                        });
189                        *client = new_client;
190                        tracing::info!("Reconnected after bootstrap recovery");
191
192                        // Wait for recovery to complete
193                        if let Err(err) = wait_for_recovery(client).await {
194                            tracing::error!("Failed to complete recovery: {}", err);
195                        }
196                    }
197                } else {
198                    // No panic found in logs, but connection is lost - this is uncertain
199                    // Use bootstrap recovery to be safe (avoid getting stuck in auto-recovery)
200                    tracing::warn!(
201                        "No component panic detected, but connection lost - using bootstrap recovery"
202                    );
203
204                    if let Err(err) = bootstrap_recovery(restore_cmd) {
205                        tracing::error!("Bootstrap recovery failed: {}", err);
206                    }
207
208                    if let Ok((new_client, connection)) = reconnect_frontend().await {
209                        tokio::spawn(async move {
210                            if let Err(e) = connection.await {
211                                tracing::error!("connection error: {}", e);
212                            }
213                        });
214                        *client = new_client;
215                        tracing::info!("Reconnected after bootstrap recovery");
216
217                        // Wait for recovery to complete
218                        if let Err(err) = wait_for_recovery(client).await {
219                            tracing::error!("Failed to complete recovery: {}", err);
220                        }
221                    }
222                }
223            } else {
224                // Connection still alive but query failed - likely a normal query error
225                tracing::debug!("Connection alive, treating as normal query error");
226            }
227
228            (false, error_msg)
229        }
230    }
231}
232
233/// Reconnect to the frontend `PostgreSQL` interface.
234async fn reconnect_frontend() -> Result<
235    (
236        Client,
237        tokio_postgres::Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>,
238    ),
239    tokio_postgres::Error,
240> {
241    tokio_postgres::Config::new()
242        .host("localhost")
243        .port(4566)
244        .dbname("dev")
245        .user("root")
246        .password("")
247        .connect_timeout(Duration::from_secs(60))
248        .connect(NoTls)
249        .await
250}
251
252/// Wait until RW recovery finishes (`rw_recovery_status() = 'RUNNING'`)
253pub async fn wait_for_recovery(client: &Client) -> anyhow::Result<()> {
254    let timeout = Duration::from_secs(300);
255    let mut interval = tokio::time::interval(Duration::from_millis(100));
256
257    let res: Result<(), anyhow::Error> = tokio::time::timeout(timeout, async {
258        loop {
259            let query_res = client.simple_query("select rw_recovery_status();").await;
260            if let Ok(messages) = query_res {
261                for msg in messages {
262                    if let tokio_postgres::SimpleQueryMessage::Row(row) = msg
263                        && let Some(status) = row.get(0)
264                        && status == "RUNNING"
265                    {
266                        return Ok(());
267                    }
268                }
269            }
270            interval.tick().await;
271        }
272    })
273    .await
274    .map_err(|_| anyhow::anyhow!("timed out waiting for recovery"))?;
275
276    res
277}
278
279/// Check if the error message indicates a frontend-only error (SQL syntax, type mismatch, etc.)
280/// These errors are typically safe and can be handled by auto-recovery.
281fn is_frontend_error(error_msg: &str) -> bool {
282    error_msg.contains("syntax error")
283        || error_msg.contains("parse error")
284        || error_msg.contains("Bind error")
285        || error_msg.contains("Planner error")
286        || error_msg.contains("Catalog error")
287        || error_msg.contains("type mismatch")
288        || error_msg.contains("not found")
289        || error_msg.contains("already exists")
290        || error_msg.contains("permission denied")
291}
292
293/// Get the default `RisingWave` log directory path.
294fn get_log_dir() -> PathBuf {
295    PathBuf::from(".risingwave/log")
296}
297
298/// Check if any critical component (compute/meta/compactor) has panicked recently.
299/// Only checks logs modified within the given time window to avoid false positives.
300///
301/// Returns Some(PanicInfo) if a panic is detected, None otherwise.
302fn check_component_panic(since: SystemTime) -> Option<PanicInfo> {
303    let log_dir = get_log_dir();
304    if !log_dir.exists() {
305        tracing::warn!("Log directory not found: {}", log_dir.display());
306        return None;
307    }
308
309    // Read all log files in the directory
310    let entries = match fs::read_dir(&log_dir) {
311        Ok(entries) => entries,
312        Err(e) => {
313            tracing::warn!("Failed to read log directory: {}", e);
314            return None;
315        }
316    };
317
318    for entry in entries.flatten() {
319        let path = entry.path();
320        let filename = match path.file_name().and_then(|n| n.to_str()) {
321            Some(name) => name,
322            None => continue,
323        };
324
325        // Identify critical components (skip frontend logs)
326        let component = if filename.starts_with("compute-node-") {
327            "compute"
328        } else if filename.starts_with("meta-node-") {
329            "meta"
330        } else if filename.starts_with("compactor-") {
331            "compactor"
332        } else {
333            continue; // Skip frontend and other logs
334        };
335
336        // Check if the log file was modified recently (within time window)
337        if let Ok(metadata) = fs::metadata(&path)
338            && let Ok(modified) = metadata.modified()
339            && modified < since
340        {
341            continue; // Skip old log files
342        }
343
344        // Search for panic patterns in the log file
345        if let Some(panic_msg) = search_panic_in_log(&path) {
346            tracing::error!("Detected panic in {} component: {}", component, panic_msg);
347            return Some(PanicInfo {
348                component: component.to_owned(),
349                message: panic_msg,
350            });
351        }
352    }
353
354    None
355}
356
357/// Search for panic patterns in a log file.
358/// Uses tail to read only the last N lines for efficiency.
359///
360/// Panic patterns include:
361/// - "thread .* panicked"
362/// - Lines containing "PANIC" at ERROR/FATAL level
363fn search_panic_in_log(log_path: &Path) -> Option<String> {
364    // Use tail to read last 500 lines for efficiency
365    let output = Command::new("tail")
366        .arg("-n")
367        .arg("500")
368        .arg(log_path)
369        .output()
370        .ok()?;
371
372    let content = String::from_utf8_lossy(&output.stdout);
373
374    // Search for panic patterns
375    for line in content.lines() {
376        let line_lower = line.to_lowercase();
377        if line_lower.contains("thread") && line_lower.contains("panicked") {
378            return Some(line.to_owned());
379        }
380        if line_lower.contains("panic")
381            && (line_lower.contains("error") || line_lower.contains("fatal"))
382        {
383            return Some(line.to_owned());
384        }
385    }
386
387    None
388}
389
390/// Execute bootstrap recovery command to fully restart `RisingWave`.
391/// This is needed when critical components (compute/meta/compactor) panic.
392fn bootstrap_recovery(restore_cmd: &str) -> anyhow::Result<()> {
393    tracing::warn!("Executing bootstrap recovery due to component panic");
394
395    let status = Command::new("sh").arg("-c").arg(restore_cmd).status()?;
396
397    if status.success() {
398        tracing::info!("Bootstrap recovery completed successfully");
399        Ok(())
400    } else {
401        Err(anyhow::anyhow!(
402            "Bootstrap recovery failed with status: {}",
403            status
404        ))
405    }
406}
407
408/// Wait for frontend to auto-recover with a timeout.
409/// Returns Ok if recovery succeeds, Err if timeout expires.
410async fn wait_for_auto_recovery(client: &Client, timeout_secs: u64) -> anyhow::Result<()> {
411    let timeout = Duration::from_secs(timeout_secs);
412    let mut interval = tokio::time::interval(Duration::from_millis(100));
413
414    tokio::time::timeout(timeout, async {
415        loop {
416            let query_res = client.simple_query("select rw_recovery_status();").await;
417            if let Ok(messages) = query_res {
418                for msg in messages {
419                    if let tokio_postgres::SimpleQueryMessage::Row(row) = msg
420                        && let Some(status) = row.get(0)
421                        && status == "RUNNING"
422                    {
423                        return Ok(());
424                    }
425                }
426            }
427            interval.tick().await;
428        }
429    })
430    .await
431    .map_err(|_| anyhow::anyhow!("Frontend auto-recovery timeout after {}s", timeout_secs))?
432}