risingwave_frontend/handler/
explain.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21    ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::{BatchPlanChoice, gen_batch_plan_by_statement};
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32#[cfg(feature = "datafusion")]
33use crate::datafusion::DfBatchQueryPlanResult;
34use crate::error::{ErrorCode, Result};
35use crate::handler::HandlerArgs;
36use crate::handler::create_table::handle_create_table_plan;
37use crate::optimizer::OptimizerContext;
38use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
39use crate::optimizer::plan_node::generic::GenericPlanRef;
40use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
41use crate::scheduler::BatchPlanFragmenter;
42use crate::stream_fragmenter::build_graph;
43use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
44
45pub async fn do_handle_explain(
46    handler_args: HandlerArgs,
47    explain_options: ExplainOptions,
48    stmt: Statement,
49    blocks: &mut Vec<String>,
50) -> Result<()> {
51    // Workaround to avoid `Rc` across `await` point.
52    let mut batch_plan_fragmenter = None;
53    let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
54
55    let session = handler_args.session.clone();
56
57    enum PhysicalPlanRef {
58        Stream(StreamPlanRef),
59        Batch(BatchPlanRef),
60    }
61    enum PlanToExplain {
62        Rw(PhysicalPlanRef),
63        #[cfg(feature = "datafusion")]
64        Df(DfBatchQueryPlanResult),
65    }
66
67    {
68        let (plan, table, context) = match stmt {
69            // `CREATE TABLE` takes the ownership of the `OptimizerContext` to avoid `Rc` across
70            // `await` point. We can only take the reference back from the `PlanRef` if it's
71            // successfully planned.
72            Statement::CreateTable {
73                name,
74                columns,
75                constraints,
76                format_encode,
77                source_watermarks,
78                append_only,
79                on_conflict,
80                with_version_columns,
81                cdc_table_info,
82                include_column_options,
83                wildcard_idx,
84                webhook_info,
85                ..
86            } => {
87                let format_encode = format_encode.map(|s| s.into_v2_with_warning());
88
89                let (plan, _source, table, _job_type, _) = handle_create_table_plan(
90                    handler_args,
91                    explain_options,
92                    format_encode,
93                    cdc_table_info,
94                    &name,
95                    columns,
96                    wildcard_idx,
97                    constraints,
98                    source_watermarks,
99                    append_only,
100                    on_conflict,
101                    with_version_columns
102                        .iter()
103                        .map(|col| col.real_value())
104                        .collect(),
105                    include_column_options,
106                    webhook_info,
107                    risingwave_common::catalog::Engine::Hummock,
108                )
109                .await?;
110                let context = plan.ctx();
111                (
112                    Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(plan))),
113                    Some(table),
114                    context,
115                )
116            }
117            Statement::CreateSink { stmt } => {
118                let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
119                    .await
120                    .map(|plan| plan.sink_plan)?;
121                let context = plan.ctx();
122                (
123                    Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(plan))),
124                    None,
125                    context,
126                )
127            }
128
129            Statement::FetchCursor {
130                stmt: FetchCursorStatement { cursor_name, .. },
131            } => {
132                let cursor_manager = session.clone().get_cursor_manager();
133                let plan = cursor_manager
134                    .gen_batch_plan_with_subscription_cursor(
135                        &cursor_name.real_value(),
136                        handler_args,
137                    )
138                    .await
139                    .map(|x| x.plan)?;
140                let context = plan.ctx();
141                (
142                    Ok(PlanToExplain::Rw(PhysicalPlanRef::Batch(plan))),
143                    None,
144                    context,
145                )
146            }
147
148            // For other queries without `await` point, we can keep a copy of reference to the
149            // `OptimizerContext` even if the planning fails. This enables us to log the partial
150            // traces for better debugging experience.
151            _ => {
152                let context: OptimizerContextRef =
153                    OptimizerContext::new(handler_args, explain_options).into();
154                let context_clone = context.clone();
155
156                let res = match stmt {
157                    // -- Streaming DDLs --
158                    Statement::CreateView {
159                        or_replace: false,
160                        materialized: true,
161                        query,
162                        name,
163                        columns,
164                        emit_mode,
165                        ..
166                    } => gen_create_mv_plan(&session, context, *query, name, columns, emit_mode)
167                        .map(|(plan, table)| {
168                            (
169                                PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)),
170                                Some(table),
171                            )
172                        }),
173                    Statement::CreateView {
174                        materialized: false,
175                        ..
176                    } => {
177                        return Err(ErrorCode::NotSupported(
178                            "EXPLAIN CREATE VIEW".into(),
179                            "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
180                        ).into());
181                    }
182
183                    Statement::CreateSubscription { .. } => {
184                        return Err(ErrorCode::NotSupported(
185                            "EXPLAIN CREATE SUBSCRIPTION".into(),
186                            "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
187                        ).into());
188                    }
189                    Statement::CreateIndex {
190                        name,
191                        table_name,
192                        method,
193                        columns,
194                        include,
195                        distributed_by,
196                        ..
197                    } => {
198                        let (schema_name, table, index_table_name) =
199                            resolve_index_schema(&session, name, table_name)?;
200                        gen_create_index_plan(
201                            &session,
202                            context,
203                            schema_name,
204                            table,
205                            index_table_name,
206                            method,
207                            columns,
208                            include,
209                            distributed_by,
210                        )
211                    }
212                    .map(|(plan, index_table, _index)| {
213                        (
214                            PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)),
215                            Some(index_table),
216                        )
217                    }),
218
219                    // -- Batch Queries --
220                    Statement::Insert { .. }
221                    | Statement::Delete { .. }
222                    | Statement::Update { .. }
223                    | Statement::Query { .. } => {
224                        match gen_batch_plan_by_statement(&session, context, stmt)? {
225                            BatchPlanChoice::Rw(plan_result) => Ok((
226                                PlanToExplain::Rw(PhysicalPlanRef::Batch(plan_result.plan)),
227                                None,
228                            )),
229                            #[cfg(feature = "datafusion")]
230                            BatchPlanChoice::Df(plan_result) => {
231                                Ok((PlanToExplain::Df(plan_result), None))
232                            }
233                        }
234                    }
235
236                    _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
237                };
238
239                match res {
240                    Ok((plan, table)) => (Ok(plan), table, context_clone),
241                    Err(e) => (Err(e), None, context_clone),
242                }
243            }
244        };
245
246        let explain_trace = explain_options.trace;
247        let explain_verbose = explain_options.verbose;
248        let explain_backfill = explain_options.backfill;
249        let explain_type = explain_options.explain_type;
250        let explain_format = explain_options.explain_format;
251
252        if explain_trace {
253            let trace = context.take_trace();
254            blocks.extend(trace);
255        }
256
257        match explain_type {
258            ExplainType::DistSql => {
259                if let Ok(plan) = &plan {
260                    match plan {
261                        PlanToExplain::Rw(PhysicalPlanRef::Batch(plan)) => {
262                            let worker_node_manager_reader = WorkerNodeSelector::new(
263                                session.env().worker_node_manager_ref(),
264                                session.is_barrier_read(),
265                            );
266                            batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
267                                worker_node_manager_reader,
268                                session.env().catalog_reader().clone(),
269                                session.config().batch_parallelism().0,
270                                plan.clone(),
271                            )?);
272                            batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
273                                ExplainFormat::Dot
274                            } else {
275                                ExplainFormat::Json
276                            }
277                        }
278                        PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)) => {
279                            let graph = build_graph(plan.clone(), None)?;
280                            let table = table.map(|x| x.to_prost());
281                            if explain_format == ExplainFormat::Dot {
282                                blocks.push(explain_stream_graph_as_dot(
283                                    &graph,
284                                    table,
285                                    explain_verbose,
286                                ))
287                            } else {
288                                blocks.push(explain_stream_graph(&graph, table, explain_verbose));
289                            }
290                        }
291                        #[cfg(feature = "datafusion")]
292                        PlanToExplain::Df(_) => {
293                            return Err(ErrorCode::NotSupported(
294                                "EXPLAIN DISTRIBUTED for DataFusion plan".into(),
295                                "Distributed explain is only available for RisingWave batch plans."
296                                    .into(),
297                            )
298                            .into());
299                        }
300                    }
301                }
302            }
303            ExplainType::Physical => {
304                // if explain trace is on, the plan has been in the rows
305                if !explain_trace && let Ok(physical_plan) = &plan {
306                    match physical_plan {
307                        PlanToExplain::Rw(physical_plan) => {
308                            let plan = match &physical_plan {
309                                PhysicalPlanRef::Stream(plan) => plan as &dyn Explain,
310                                PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
311                            };
312                            match explain_format {
313                                ExplainFormat::Text => {
314                                    blocks.push(plan.explain_to_string());
315                                }
316                                ExplainFormat::Json => blocks.push(plan.explain_to_json()),
317                                ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
318                                ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
319                                ExplainFormat::Dot => {
320                                    if explain_backfill
321                                        && let PhysicalPlanRef::Stream(plan) = physical_plan
322                                    {
323                                        let dot_formatted_backfill_order =
324                                            explain_backfill_order_in_dot_format(
325                                                &session,
326                                                context.with_options().backfill_order_strategy(),
327                                                plan.clone(),
328                                            )?;
329                                        blocks.push(dot_formatted_backfill_order);
330                                    } else {
331                                        blocks.push(plan.explain_to_dot());
332                                    }
333                                }
334                            }
335                        }
336                        #[cfg(feature = "datafusion")]
337                        PlanToExplain::Df(plan) => {
338                            explain_datafusion_plan(plan, explain_format, explain_verbose, blocks)?
339                        }
340                    }
341                }
342            }
343            ExplainType::Logical => {
344                // if explain trace is on, the plan has been in the rows
345                if !explain_trace {
346                    let output = context.take_logical().ok_or_else(|| {
347                        ErrorCode::InternalError("Logical plan not found for query".into())
348                    })?;
349                    blocks.push(output);
350                }
351            }
352        }
353
354        // Throw the error.
355        plan?;
356    }
357
358    if let Some(fragmenter) = batch_plan_fragmenter {
359        let query = fragmenter.generate_complete_query().await?;
360        let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
361            let graph = query.stage_graph.to_petgraph();
362            let dot = Dot::new(&graph);
363            dot.to_string()
364        } else {
365            serde_json::to_string_pretty(&query.stage_graph).unwrap()
366        };
367        blocks.push(stage_graph);
368    }
369
370    Ok(())
371}
372
373#[cfg(feature = "datafusion")]
374fn explain_datafusion_plan(
375    plan: &DfBatchQueryPlanResult,
376    explain_format: ExplainFormat,
377    verbose: bool,
378    blocks: &mut Vec<String>,
379) -> Result<()> {
380    let output = match explain_format {
381        ExplainFormat::Text => {
382            if verbose {
383                plan.plan.display_indent_schema().to_string()
384            } else {
385                plan.plan.display_indent().to_string()
386            }
387        }
388        unsupported => {
389            return Err(ErrorCode::NotSupported(
390                format!("EXPLAIN ... {:?} for DataFusion plan", unsupported),
391                "Only TEXT format is supported for DataFusion plans.".to_owned(),
392            )
393            .into());
394        }
395    };
396    blocks.push(output);
397    Ok(())
398}
399
400pub async fn handle_explain(
401    handler_args: HandlerArgs,
402    stmt: Statement,
403    options: ExplainOptions,
404    analyze: bool,
405) -> Result<RwPgResponse> {
406    if analyze {
407        // NOTE(kwannoel): This path is for explain analyze on stream and batch queries.
408        // For existing stream jobs, see the handler module `explain_analyze` instead.
409        bail_not_implemented!(issue = 4856, "explain analyze");
410    }
411    if options.trace && options.explain_format == ExplainFormat::Json {
412        return Err(ErrorCode::NotSupported(
413            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
414            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
415        )
416        .into());
417    }
418    if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
419    {
420        return Err(ErrorCode::NotSupported(
421            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
422            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
423        )
424        .into());
425    }
426
427    let mut blocks = Vec::new();
428    let result = do_handle_explain(handler_args, options, stmt, &mut blocks).await;
429
430    if let Err(e) = result {
431        if options.trace {
432            // If `trace` is on, we include the error in the output with partial traces.
433            blocks.push(if options.verbose {
434                format!("ERROR: {:?}", e.as_report())
435            } else {
436                format!("ERROR: {}", e.as_report())
437            });
438        } else {
439            // Else, directly return the error.
440            return Err(e);
441        }
442    }
443
444    let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
445        query_plan: l.into(),
446    });
447
448    Ok(PgResponse::builder(StatementType::EXPLAIN)
449        .rows(rows)
450        .into())
451}
452
453#[derive(Fields)]
454#[fields(style = "TITLE CASE")]
455pub(crate) struct ExplainRow {
456    pub query_plan: String,
457}