risingwave_frontend/handler/
explain.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21    ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{Convention, Explain};
39use crate::scheduler::BatchPlanFragmenter;
40use crate::stream_fragmenter::build_graph;
41use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
42
43pub async fn do_handle_explain(
44    handler_args: HandlerArgs,
45    explain_options: ExplainOptions,
46    stmt: Statement,
47    blocks: &mut Vec<String>,
48) -> Result<()> {
49    // Workaround to avoid `Rc` across `await` point.
50    let mut batch_plan_fragmenter = None;
51    let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
52
53    let session = handler_args.session.clone();
54
55    {
56        let (plan, context) = match stmt {
57            // `CREATE TABLE` takes the ownership of the `OptimizerContext` to avoid `Rc` across
58            // `await` point. We can only take the reference back from the `PlanRef` if it's
59            // successfully planned.
60            Statement::CreateTable {
61                name,
62                columns,
63                constraints,
64                format_encode,
65                source_watermarks,
66                append_only,
67                on_conflict,
68                with_version_column,
69                cdc_table_info,
70                include_column_options,
71                wildcard_idx,
72                webhook_info,
73                ..
74            } => {
75                let format_encode = format_encode.map(|s| s.into_v2_with_warning());
76
77                let (plan, _source, _table, _job_type) = handle_create_table_plan(
78                    handler_args,
79                    explain_options,
80                    format_encode,
81                    cdc_table_info,
82                    name.clone(),
83                    columns,
84                    wildcard_idx,
85                    constraints,
86                    source_watermarks,
87                    append_only,
88                    on_conflict,
89                    with_version_column.map(|x| x.real_value()),
90                    include_column_options,
91                    webhook_info,
92                    risingwave_common::catalog::Engine::Hummock,
93                )
94                .await?;
95                let context = plan.ctx();
96                (Ok(plan), context)
97            }
98            Statement::CreateSink { stmt } => {
99                let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
100                    .await
101                    .map(|plan| plan.sink_plan)?;
102                let context = plan.ctx();
103                (Ok(plan), context)
104            }
105
106            Statement::FetchCursor {
107                stmt: FetchCursorStatement { cursor_name, .. },
108            } => {
109                let cursor_manager = session.clone().get_cursor_manager();
110                let plan = cursor_manager
111                    .gen_batch_plan_with_subscription_cursor(
112                        &cursor_name.real_value(),
113                        handler_args,
114                    )
115                    .await
116                    .map(|x| x.plan)?;
117                let context = plan.ctx();
118                (Ok(plan), context)
119            }
120
121            // For other queries without `await` point, we can keep a copy of reference to the
122            // `OptimizerContext` even if the planning fails. This enables us to log the partial
123            // traces for better debugging experience.
124            _ => {
125                let context: OptimizerContextRef =
126                    OptimizerContext::new(handler_args, explain_options).into();
127                let plan = match stmt {
128                    // -- Streaming DDLs --
129                    Statement::CreateView {
130                        or_replace: false,
131                        materialized: true,
132                        query,
133                        name,
134                        columns,
135                        emit_mode,
136                        ..
137                    } => gen_create_mv_plan(
138                        &session,
139                        context.clone(),
140                        *query,
141                        name,
142                        columns,
143                        emit_mode,
144                    )
145                    .map(|x| x.0),
146                    Statement::CreateView {
147                        materialized: false,
148                        ..
149                    } => {
150                        return Err(ErrorCode::NotSupported(
151                            "EXPLAIN CREATE VIEW".into(),
152                            "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
153                        ).into());
154                    }
155
156                    Statement::CreateSubscription { .. } => {
157                        return Err(ErrorCode::NotSupported(
158                            "EXPLAIN CREATE SUBSCRIPTION".into(),
159                            "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
160                        ).into());
161                    }
162                    Statement::CreateIndex {
163                        name,
164                        table_name,
165                        columns,
166                        include,
167                        distributed_by,
168                        ..
169                    } => {
170                        let (schema_name, table, index_table_name) =
171                            resolve_index_schema(&session, name, table_name)?;
172                        gen_create_index_plan(
173                            &session,
174                            context.clone(),
175                            schema_name,
176                            table,
177                            index_table_name,
178                            columns,
179                            include,
180                            distributed_by,
181                        )
182                    }
183                    .map(|x| x.0),
184
185                    // -- Batch Queries --
186                    Statement::Insert { .. }
187                    | Statement::Delete { .. }
188                    | Statement::Update { .. }
189                    | Statement::Query { .. } => {
190                        gen_batch_plan_by_statement(&session, context, stmt).map(|x| x.plan)
191                    }
192
193                    _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
194                };
195
196                let plan = plan?;
197                let context = plan.ctx().clone();
198
199                (Ok(plan) as Result<_>, context)
200            }
201        };
202
203        let explain_trace = context.is_explain_trace();
204        let explain_verbose = context.is_explain_verbose();
205        let explain_backfill = context.is_explain_backfill();
206        let explain_type = context.explain_type();
207        let explain_format = context.explain_format();
208
209        if explain_trace {
210            let trace = context.take_trace();
211            blocks.extend(trace);
212        }
213
214        match explain_type {
215            ExplainType::DistSql => {
216                if let Ok(plan) = &plan {
217                    match plan.convention() {
218                        Convention::Logical => unreachable!(),
219                        Convention::Batch => {
220                            let worker_node_manager_reader = WorkerNodeSelector::new(
221                                session.env().worker_node_manager_ref(),
222                                session.is_barrier_read(),
223                            );
224                            batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
225                                worker_node_manager_reader,
226                                session.env().catalog_reader().clone(),
227                                session.config().batch_parallelism().0,
228                                session.config().timezone().to_owned(),
229                                plan.clone(),
230                            )?);
231                            batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
232                                ExplainFormat::Dot
233                            } else {
234                                ExplainFormat::Json
235                            }
236                        }
237                        Convention::Stream => {
238                            let graph = build_graph(plan.clone(), None)?;
239                            if explain_format == ExplainFormat::Dot {
240                                blocks.push(explain_stream_graph_as_dot(&graph, explain_verbose))
241                            } else {
242                                blocks.push(explain_stream_graph(&graph, explain_verbose));
243                            }
244                        }
245                    }
246                }
247            }
248            ExplainType::Physical => {
249                // if explain trace is on, the plan has been in the rows
250                if !explain_trace && let Ok(plan) = &plan {
251                    match explain_format {
252                        ExplainFormat::Text => {
253                            blocks.push(plan.explain_to_string());
254                        }
255                        ExplainFormat::Json => blocks.push(plan.explain_to_json()),
256                        ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
257                        ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
258                        ExplainFormat::Dot => {
259                            if explain_backfill {
260                                let dot_formatted_backfill_order =
261                                    explain_backfill_order_in_dot_format(
262                                        &session,
263                                        context.with_options().backfill_order_strategy(),
264                                        plan.clone(),
265                                    )?;
266                                blocks.push(dot_formatted_backfill_order);
267                            } else {
268                                blocks.push(plan.explain_to_dot());
269                            }
270                        }
271                    }
272                }
273            }
274            ExplainType::Logical => {
275                // if explain trace is on, the plan has been in the rows
276                if !explain_trace {
277                    let output = context.take_logical().ok_or_else(|| {
278                        ErrorCode::InternalError("Logical plan not found for query".into())
279                    })?;
280                    blocks.push(output);
281                }
282            }
283        }
284
285        // Throw the error.
286        plan?;
287    }
288
289    if let Some(fragmenter) = batch_plan_fragmenter {
290        let query = fragmenter.generate_complete_query().await?;
291        let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
292            let graph = query.stage_graph.to_petgraph();
293            let dot = Dot::new(&graph);
294            dot.to_string()
295        } else {
296            serde_json::to_string_pretty(&query.stage_graph).unwrap()
297        };
298        blocks.push(stage_graph);
299    }
300
301    Ok(())
302}
303
304pub async fn handle_explain(
305    handler_args: HandlerArgs,
306    stmt: Statement,
307    options: ExplainOptions,
308    analyze: bool,
309) -> Result<RwPgResponse> {
310    if analyze {
311        // NOTE(kwannoel): This path is for explain analyze on stream and batch queries.
312        // For existing stream jobs, see the handler module `explain_analyze` instead.
313        bail_not_implemented!(issue = 4856, "explain analyze");
314    }
315    if options.trace && options.explain_format == ExplainFormat::Json {
316        return Err(ErrorCode::NotSupported(
317            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
318            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
319        )
320        .into());
321    }
322    if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
323    {
324        return Err(ErrorCode::NotSupported(
325            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
326            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
327        )
328        .into());
329    }
330
331    let mut blocks = Vec::new();
332    let result = do_handle_explain(handler_args, options.clone(), stmt, &mut blocks).await;
333
334    if let Err(e) = result {
335        if options.trace {
336            // If `trace` is on, we include the error in the output with partial traces.
337            blocks.push(if options.verbose {
338                format!("ERROR: {:?}", e.as_report())
339            } else {
340                format!("ERROR: {}", e.as_report())
341            });
342        } else {
343            // Else, directly return the error.
344            return Err(e);
345        }
346    }
347
348    let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
349        query_plan: l.into(),
350    });
351
352    Ok(PgResponse::builder(StatementType::EXPLAIN)
353        .rows(rows)
354        .into())
355}
356
357#[derive(Fields)]
358#[fields(style = "TITLE CASE")]
359pub(crate) struct ExplainRow {
360    pub query_plan: String,
361}