risingwave_frontend/handler/
explain.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21    ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{Convention, Explain};
39use crate::scheduler::BatchPlanFragmenter;
40use crate::stream_fragmenter::build_graph;
41use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
42
43pub async fn do_handle_explain(
44    handler_args: HandlerArgs,
45    explain_options: ExplainOptions,
46    stmt: Statement,
47    blocks: &mut Vec<String>,
48) -> Result<()> {
49    // Workaround to avoid `Rc` across `await` point.
50    let mut batch_plan_fragmenter = None;
51    let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
52
53    let session = handler_args.session.clone();
54
55    {
56        let (plan, table, context) = match stmt {
57            // `CREATE TABLE` takes the ownership of the `OptimizerContext` to avoid `Rc` across
58            // `await` point. We can only take the reference back from the `PlanRef` if it's
59            // successfully planned.
60            Statement::CreateTable {
61                name,
62                columns,
63                constraints,
64                format_encode,
65                source_watermarks,
66                append_only,
67                on_conflict,
68                with_version_column,
69                cdc_table_info,
70                include_column_options,
71                wildcard_idx,
72                webhook_info,
73                ..
74            } => {
75                let format_encode = format_encode.map(|s| s.into_v2_with_warning());
76
77                let (plan, _source, table, _job_type, _) = handle_create_table_plan(
78                    handler_args,
79                    explain_options,
80                    format_encode,
81                    cdc_table_info,
82                    name.clone(),
83                    columns,
84                    wildcard_idx,
85                    constraints,
86                    source_watermarks,
87                    append_only,
88                    on_conflict,
89                    with_version_column.map(|x| x.real_value()),
90                    include_column_options,
91                    webhook_info,
92                    risingwave_common::catalog::Engine::Hummock,
93                )
94                .await?;
95                let context = plan.ctx();
96                (Ok(plan), Some(table), context)
97            }
98            Statement::CreateSink { stmt } => {
99                let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
100                    .await
101                    .map(|plan| plan.sink_plan)?;
102                let context = plan.ctx();
103                (Ok(plan), None, context)
104            }
105
106            Statement::FetchCursor {
107                stmt: FetchCursorStatement { cursor_name, .. },
108            } => {
109                let cursor_manager = session.clone().get_cursor_manager();
110                let plan = cursor_manager
111                    .gen_batch_plan_with_subscription_cursor(
112                        &cursor_name.real_value(),
113                        handler_args,
114                    )
115                    .await
116                    .map(|x| x.plan)?;
117                let context = plan.ctx();
118                (Ok(plan), None, context)
119            }
120
121            // For other queries without `await` point, we can keep a copy of reference to the
122            // `OptimizerContext` even if the planning fails. This enables us to log the partial
123            // traces for better debugging experience.
124            _ => {
125                let context: OptimizerContextRef =
126                    OptimizerContext::new(handler_args, explain_options).into();
127                let (plan, table) = match stmt {
128                    // -- Streaming DDLs --
129                    Statement::CreateView {
130                        or_replace: false,
131                        materialized: true,
132                        query,
133                        name,
134                        columns,
135                        emit_mode,
136                        ..
137                    } => gen_create_mv_plan(
138                        &session,
139                        context.clone(),
140                        *query,
141                        name,
142                        columns,
143                        emit_mode,
144                    )
145                    .map(|(plan, table)| (plan, Some(table))),
146                    Statement::CreateView {
147                        materialized: false,
148                        ..
149                    } => {
150                        return Err(ErrorCode::NotSupported(
151                            "EXPLAIN CREATE VIEW".into(),
152                            "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
153                        ).into());
154                    }
155
156                    Statement::CreateSubscription { .. } => {
157                        return Err(ErrorCode::NotSupported(
158                            "EXPLAIN CREATE SUBSCRIPTION".into(),
159                            "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
160                        ).into());
161                    }
162                    Statement::CreateIndex {
163                        name,
164                        table_name,
165                        columns,
166                        include,
167                        distributed_by,
168                        ..
169                    } => {
170                        let (schema_name, table, index_table_name) =
171                            resolve_index_schema(&session, name, table_name)?;
172                        gen_create_index_plan(
173                            &session,
174                            context.clone(),
175                            schema_name,
176                            table,
177                            index_table_name,
178                            columns,
179                            include,
180                            distributed_by,
181                        )
182                    }
183                    .map(|(plan, index_table, _index)| (plan, Some(index_table))),
184
185                    // -- Batch Queries --
186                    Statement::Insert { .. }
187                    | Statement::Delete { .. }
188                    | Statement::Update { .. }
189                    | Statement::Query { .. } => {
190                        gen_batch_plan_by_statement(&session, context, stmt).map(|x| (x.plan, None))
191                    }
192
193                    _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
194                }?;
195
196                let context = plan.ctx().clone();
197
198                (Ok(plan) as Result<_>, table, context)
199            }
200        };
201
202        let explain_trace = context.is_explain_trace();
203        let explain_verbose = context.is_explain_verbose();
204        let explain_backfill = context.is_explain_backfill();
205        let explain_type = context.explain_type();
206        let explain_format = context.explain_format();
207
208        if explain_trace {
209            let trace = context.take_trace();
210            blocks.extend(trace);
211        }
212
213        match explain_type {
214            ExplainType::DistSql => {
215                if let Ok(plan) = &plan {
216                    match plan.convention() {
217                        Convention::Logical => unreachable!(),
218                        Convention::Batch => {
219                            let worker_node_manager_reader = WorkerNodeSelector::new(
220                                session.env().worker_node_manager_ref(),
221                                session.is_barrier_read(),
222                            );
223                            batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
224                                worker_node_manager_reader,
225                                session.env().catalog_reader().clone(),
226                                session.config().batch_parallelism().0,
227                                session.config().timezone().to_owned(),
228                                plan.clone(),
229                            )?);
230                            batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
231                                ExplainFormat::Dot
232                            } else {
233                                ExplainFormat::Json
234                            }
235                        }
236                        Convention::Stream => {
237                            let graph = build_graph(plan.clone(), None)?;
238                            let table = table.map(|x| x.to_prost());
239                            if explain_format == ExplainFormat::Dot {
240                                blocks.push(explain_stream_graph_as_dot(
241                                    &graph,
242                                    table,
243                                    explain_verbose,
244                                ))
245                            } else {
246                                blocks.push(explain_stream_graph(&graph, table, explain_verbose));
247                            }
248                        }
249                    }
250                }
251            }
252            ExplainType::Physical => {
253                // if explain trace is on, the plan has been in the rows
254                if !explain_trace && let Ok(plan) = &plan {
255                    match explain_format {
256                        ExplainFormat::Text => {
257                            blocks.push(plan.explain_to_string());
258                        }
259                        ExplainFormat::Json => blocks.push(plan.explain_to_json()),
260                        ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
261                        ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
262                        ExplainFormat::Dot => {
263                            if explain_backfill {
264                                let dot_formatted_backfill_order =
265                                    explain_backfill_order_in_dot_format(
266                                        &session,
267                                        context.with_options().backfill_order_strategy(),
268                                        plan.clone(),
269                                    )?;
270                                blocks.push(dot_formatted_backfill_order);
271                            } else {
272                                blocks.push(plan.explain_to_dot());
273                            }
274                        }
275                    }
276                }
277            }
278            ExplainType::Logical => {
279                // if explain trace is on, the plan has been in the rows
280                if !explain_trace {
281                    let output = context.take_logical().ok_or_else(|| {
282                        ErrorCode::InternalError("Logical plan not found for query".into())
283                    })?;
284                    blocks.push(output);
285                }
286            }
287        }
288
289        // Throw the error.
290        plan?;
291    }
292
293    if let Some(fragmenter) = batch_plan_fragmenter {
294        let query = fragmenter.generate_complete_query().await?;
295        let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
296            let graph = query.stage_graph.to_petgraph();
297            let dot = Dot::new(&graph);
298            dot.to_string()
299        } else {
300            serde_json::to_string_pretty(&query.stage_graph).unwrap()
301        };
302        blocks.push(stage_graph);
303    }
304
305    Ok(())
306}
307
308pub async fn handle_explain(
309    handler_args: HandlerArgs,
310    stmt: Statement,
311    options: ExplainOptions,
312    analyze: bool,
313) -> Result<RwPgResponse> {
314    if analyze {
315        // NOTE(kwannoel): This path is for explain analyze on stream and batch queries.
316        // For existing stream jobs, see the handler module `explain_analyze` instead.
317        bail_not_implemented!(issue = 4856, "explain analyze");
318    }
319    if options.trace && options.explain_format == ExplainFormat::Json {
320        return Err(ErrorCode::NotSupported(
321            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
322            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
323        )
324        .into());
325    }
326    if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
327    {
328        return Err(ErrorCode::NotSupported(
329            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
330            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
331        )
332        .into());
333    }
334
335    let mut blocks = Vec::new();
336    let result = do_handle_explain(handler_args, options.clone(), stmt, &mut blocks).await;
337
338    if let Err(e) = result {
339        if options.trace {
340            // If `trace` is on, we include the error in the output with partial traces.
341            blocks.push(if options.verbose {
342                format!("ERROR: {:?}", e.as_report())
343            } else {
344                format!("ERROR: {}", e.as_report())
345            });
346        } else {
347            // Else, directly return the error.
348            return Err(e);
349        }
350    }
351
352    let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
353        query_plan: l.into(),
354    });
355
356    Ok(PgResponse::builder(StatementType::EXPLAIN)
357        .rows(rows)
358        .into())
359}
360
361#[derive(Fields)]
362#[fields(style = "TITLE CASE")]
363pub(crate) struct ExplainRow {
364    pub query_plan: String,
365}