risingwave_frontend/handler/
explain.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21    ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::plan_node::generic::GenericPlanRef;
37use crate::optimizer::plan_node::{Convention, Explain};
38use crate::scheduler::BatchPlanFragmenter;
39use crate::stream_fragmenter::build_graph;
40use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
41
42async fn do_handle_explain(
43    handler_args: HandlerArgs,
44    explain_options: ExplainOptions,
45    stmt: Statement,
46    blocks: &mut Vec<String>,
47) -> Result<()> {
48    // Workaround to avoid `Rc` across `await` point.
49    let mut batch_plan_fragmenter = None;
50    let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
51
52    let session = handler_args.session.clone();
53
54    {
55        let (plan, context) = match stmt {
56            // `CREATE TABLE` takes the ownership of the `OptimizerContext` to avoid `Rc` across
57            // `await` point. We can only take the reference back from the `PlanRef` if it's
58            // successfully planned.
59            Statement::CreateTable {
60                name,
61                columns,
62                constraints,
63                format_encode,
64                source_watermarks,
65                append_only,
66                on_conflict,
67                with_version_column,
68                cdc_table_info,
69                include_column_options,
70                wildcard_idx,
71                webhook_info,
72                ..
73            } => {
74                let format_encode = format_encode.map(|s| s.into_v2_with_warning());
75
76                let (plan, _source, _table, _job_type) = handle_create_table_plan(
77                    handler_args,
78                    explain_options,
79                    format_encode,
80                    cdc_table_info,
81                    name.clone(),
82                    columns,
83                    wildcard_idx,
84                    constraints,
85                    source_watermarks,
86                    append_only,
87                    on_conflict,
88                    with_version_column.map(|x| x.real_value()),
89                    include_column_options,
90                    webhook_info,
91                    risingwave_common::catalog::Engine::Hummock,
92                )
93                .await?;
94                let context = plan.ctx();
95                (Ok(plan), context)
96            }
97            Statement::CreateSink { stmt } => {
98                let plan = gen_sink_plan(handler_args, stmt, Some(explain_options))
99                    .await
100                    .map(|plan| plan.sink_plan)?;
101                let context = plan.ctx();
102                (Ok(plan), context)
103            }
104
105            Statement::FetchCursor {
106                stmt: FetchCursorStatement { cursor_name, .. },
107            } => {
108                let cursor_manager = session.clone().get_cursor_manager();
109                let plan = cursor_manager
110                    .gen_batch_plan_with_subscription_cursor(
111                        &cursor_name.real_value(),
112                        handler_args,
113                    )
114                    .await
115                    .map(|x| x.plan)?;
116                let context = plan.ctx();
117                (Ok(plan), context)
118            }
119
120            // For other queries without `await` point, we can keep a copy of reference to the
121            // `OptimizerContext` even if the planning fails. This enables us to log the partial
122            // traces for better debugging experience.
123            _ => {
124                let context: OptimizerContextRef =
125                    OptimizerContext::new(handler_args, explain_options).into();
126                let plan = match stmt {
127                    // -- Streaming DDLs --
128                    Statement::CreateView {
129                        or_replace: false,
130                        materialized: true,
131                        query,
132                        name,
133                        columns,
134                        emit_mode,
135                        ..
136                    } => gen_create_mv_plan(
137                        &session,
138                        context.clone(),
139                        *query,
140                        name,
141                        columns,
142                        emit_mode,
143                    )
144                    .map(|x| x.0),
145                    Statement::CreateView {
146                        materialized: false,
147                        ..
148                    } => {
149                        return Err(ErrorCode::NotSupported(
150                            "EXPLAIN CREATE VIEW".into(),
151                            "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
152                        ).into());
153                    }
154
155                    Statement::CreateSubscription { .. } => {
156                        return Err(ErrorCode::NotSupported(
157                            "EXPLAIN CREATE SUBSCRIPTION".into(),
158                            "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
159                        ).into());
160                    }
161                    Statement::CreateIndex {
162                        name,
163                        table_name,
164                        columns,
165                        include,
166                        distributed_by,
167                        ..
168                    } => {
169                        let (schema_name, table, index_table_name) =
170                            resolve_index_schema(&session, name, table_name)?;
171                        gen_create_index_plan(
172                            &session,
173                            context.clone(),
174                            schema_name,
175                            table,
176                            index_table_name,
177                            columns,
178                            include,
179                            distributed_by,
180                        )
181                    }
182                    .map(|x| x.0),
183
184                    // -- Batch Queries --
185                    Statement::Insert { .. }
186                    | Statement::Delete { .. }
187                    | Statement::Update { .. }
188                    | Statement::Query { .. } => {
189                        gen_batch_plan_by_statement(&session, context, stmt).map(|x| x.plan)
190                    }
191
192                    _ => bail_not_implemented!("unsupported statement {:?}", stmt),
193                };
194
195                let plan = plan?;
196                let context = plan.ctx().clone();
197
198                (Ok(plan) as Result<_>, context)
199            }
200        };
201
202        let explain_trace = context.is_explain_trace();
203        let explain_verbose = context.is_explain_verbose();
204        let explain_type = context.explain_type();
205        let explain_format = context.explain_format();
206
207        if explain_trace {
208            let trace = context.take_trace();
209            blocks.extend(trace);
210        }
211
212        match explain_type {
213            ExplainType::DistSql => {
214                if let Ok(plan) = &plan {
215                    match plan.convention() {
216                        Convention::Logical => unreachable!(),
217                        Convention::Batch => {
218                            let worker_node_manager_reader = WorkerNodeSelector::new(
219                                session.env().worker_node_manager_ref(),
220                                session.is_barrier_read(),
221                            );
222                            batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
223                                worker_node_manager_reader,
224                                session.env().catalog_reader().clone(),
225                                session.config().batch_parallelism().0,
226                                session.config().timezone().to_owned(),
227                                plan.clone(),
228                            )?);
229                            batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
230                                ExplainFormat::Dot
231                            } else {
232                                ExplainFormat::Json
233                            }
234                        }
235                        Convention::Stream => {
236                            let graph = build_graph(plan.clone())?;
237                            if explain_format == ExplainFormat::Dot {
238                                blocks.push(explain_stream_graph_as_dot(&graph, explain_verbose))
239                            } else {
240                                blocks.push(explain_stream_graph(&graph, explain_verbose));
241                            }
242                        }
243                    }
244                }
245            }
246            ExplainType::Physical => {
247                // if explain trace is on, the plan has been in the rows
248                if !explain_trace && let Ok(plan) = &plan {
249                    match explain_format {
250                        ExplainFormat::Text => blocks.push(plan.explain_to_string()),
251                        ExplainFormat::Json => blocks.push(plan.explain_to_json()),
252                        ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
253                        ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
254                        ExplainFormat::Dot => blocks.push(plan.explain_to_dot()),
255                    }
256                }
257            }
258            ExplainType::Logical => {
259                // if explain trace is on, the plan has been in the rows
260                if !explain_trace {
261                    let output = context.take_logical().ok_or_else(|| {
262                        ErrorCode::InternalError("Logical plan not found for query".into())
263                    })?;
264                    blocks.push(output);
265                }
266            }
267        }
268
269        // Throw the error.
270        plan?;
271    }
272
273    if let Some(fragmenter) = batch_plan_fragmenter {
274        let query = fragmenter.generate_complete_query().await?;
275        let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
276            let graph = query.stage_graph.to_petgraph();
277            let dot = Dot::new(&graph);
278            dot.to_string()
279        } else {
280            serde_json::to_string_pretty(&query.stage_graph).unwrap()
281        };
282        blocks.push(stage_graph);
283    }
284
285    Ok(())
286}
287
288pub async fn handle_explain(
289    handler_args: HandlerArgs,
290    stmt: Statement,
291    options: ExplainOptions,
292    analyze: bool,
293) -> Result<RwPgResponse> {
294    if analyze {
295        // NOTE(kwannoel): This path is for explain analyze on stream and batch queries.
296        // For existing stream jobs, see the handler module `explain_analyze` instead.
297        bail_not_implemented!(issue = 4856, "explain analyze");
298    }
299    if options.trace && options.explain_format == ExplainFormat::Json {
300        return Err(ErrorCode::NotSupported(
301            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
302            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
303        )
304        .into());
305    }
306    if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
307    {
308        return Err(ErrorCode::NotSupported(
309            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
310            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
311        )
312        .into());
313    }
314
315    let mut blocks = Vec::new();
316    let result = do_handle_explain(handler_args, options.clone(), stmt, &mut blocks).await;
317
318    if let Err(e) = result {
319        if options.trace {
320            // If `trace` is on, we include the error in the output with partial traces.
321            blocks.push(if options.verbose {
322                format!("ERROR: {:?}", e.as_report())
323            } else {
324                format!("ERROR: {}", e.as_report())
325            });
326        } else {
327            // Else, directly return the error.
328            return Err(e);
329        }
330    }
331
332    let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
333        query_plan: l.into(),
334    });
335
336    Ok(PgResponse::builder(StatementType::EXPLAIN)
337        .rows(rows)
338        .into())
339}
340
341#[derive(Fields)]
342#[fields(style = "TITLE CASE")]
343struct ExplainRow {
344    query_plan: String,
345}