risingwave_frontend/handler/
explain.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21    ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
39use crate::scheduler::BatchPlanFragmenter;
40use crate::stream_fragmenter::build_graph;
41use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
42
43pub async fn do_handle_explain(
44    handler_args: HandlerArgs,
45    explain_options: ExplainOptions,
46    stmt: Statement,
47    blocks: &mut Vec<String>,
48) -> Result<()> {
49    // Workaround to avoid `Rc` across `await` point.
50    let mut batch_plan_fragmenter = None;
51    let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
52
53    let session = handler_args.session.clone();
54
55    enum PhysicalPlanRef {
56        Stream(StreamPlanRef),
57        Batch(BatchPlanRef),
58    }
59
60    {
61        let (plan, table, context) = match stmt {
62            // `CREATE TABLE` takes the ownership of the `OptimizerContext` to avoid `Rc` across
63            // `await` point. We can only take the reference back from the `PlanRef` if it's
64            // successfully planned.
65            Statement::CreateTable {
66                name,
67                columns,
68                constraints,
69                format_encode,
70                source_watermarks,
71                append_only,
72                on_conflict,
73                with_version_columns,
74                cdc_table_info,
75                include_column_options,
76                wildcard_idx,
77                webhook_info,
78                ..
79            } => {
80                let format_encode = format_encode.map(|s| s.into_v2_with_warning());
81
82                let (plan, _source, table, _job_type, _) = handle_create_table_plan(
83                    handler_args,
84                    explain_options,
85                    format_encode,
86                    cdc_table_info,
87                    &name,
88                    columns,
89                    wildcard_idx,
90                    constraints,
91                    source_watermarks,
92                    append_only,
93                    on_conflict,
94                    with_version_columns
95                        .iter()
96                        .map(|col| col.real_value())
97                        .collect(),
98                    include_column_options,
99                    webhook_info,
100                    risingwave_common::catalog::Engine::Hummock,
101                )
102                .await?;
103                let context = plan.ctx();
104                (Ok(PhysicalPlanRef::Stream(plan)), Some(table), context)
105            }
106            Statement::CreateSink { stmt } => {
107                let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
108                    .await
109                    .map(|plan| plan.sink_plan)?;
110                let context = plan.ctx();
111                (Ok(PhysicalPlanRef::Stream(plan)), None, context)
112            }
113
114            Statement::FetchCursor {
115                stmt: FetchCursorStatement { cursor_name, .. },
116            } => {
117                let cursor_manager = session.clone().get_cursor_manager();
118                let plan = cursor_manager
119                    .gen_batch_plan_with_subscription_cursor(
120                        &cursor_name.real_value(),
121                        handler_args,
122                    )
123                    .await
124                    .map(|x| x.plan)?;
125                let context = plan.ctx();
126                (Ok(PhysicalPlanRef::Batch(plan)), None, context)
127            }
128
129            // For other queries without `await` point, we can keep a copy of reference to the
130            // `OptimizerContext` even if the planning fails. This enables us to log the partial
131            // traces for better debugging experience.
132            _ => {
133                let context: OptimizerContextRef =
134                    OptimizerContext::new(handler_args, explain_options).into();
135                let (plan, table) = match stmt {
136                    // -- Streaming DDLs --
137                    Statement::CreateView {
138                        or_replace: false,
139                        materialized: true,
140                        query,
141                        name,
142                        columns,
143                        emit_mode,
144                        ..
145                    } => gen_create_mv_plan(
146                        &session,
147                        context.clone(),
148                        *query,
149                        name,
150                        columns,
151                        emit_mode,
152                    )
153                    .map(|(plan, table)| (PhysicalPlanRef::Stream(plan), Some(table))),
154                    Statement::CreateView {
155                        materialized: false,
156                        ..
157                    } => {
158                        return Err(ErrorCode::NotSupported(
159                            "EXPLAIN CREATE VIEW".into(),
160                            "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
161                        ).into());
162                    }
163
164                    Statement::CreateSubscription { .. } => {
165                        return Err(ErrorCode::NotSupported(
166                            "EXPLAIN CREATE SUBSCRIPTION".into(),
167                            "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
168                        ).into());
169                    }
170                    Statement::CreateIndex {
171                        name,
172                        table_name,
173                        method,
174                        columns,
175                        include,
176                        distributed_by,
177                        ..
178                    } => {
179                        let (schema_name, table, index_table_name) =
180                            resolve_index_schema(&session, name, table_name)?;
181                        gen_create_index_plan(
182                            &session,
183                            context.clone(),
184                            schema_name,
185                            table,
186                            index_table_name,
187                            method,
188                            columns,
189                            include,
190                            distributed_by,
191                        )
192                    }
193                    .map(|(plan, index_table, _index)| {
194                        (PhysicalPlanRef::Stream(plan), Some(index_table))
195                    }),
196
197                    // -- Batch Queries --
198                    Statement::Insert { .. }
199                    | Statement::Delete { .. }
200                    | Statement::Update { .. }
201                    | Statement::Query { .. } => {
202                        gen_batch_plan_by_statement(&session, context, stmt)
203                            .map(|x| (PhysicalPlanRef::Batch(x.plan), None))
204                    }
205
206                    _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
207                }?;
208
209                let context = match &plan {
210                    PhysicalPlanRef::Stream(plan) => plan.ctx(),
211                    PhysicalPlanRef::Batch(plan) => plan.ctx(),
212                };
213
214                (Ok(plan) as Result<_>, table, context)
215            }
216        };
217
218        let explain_trace = context.is_explain_trace();
219        let explain_verbose = context.is_explain_verbose();
220        let explain_backfill = context.is_explain_backfill();
221        let explain_type = context.explain_type();
222        let explain_format = context.explain_format();
223
224        if explain_trace {
225            let trace = context.take_trace();
226            blocks.extend(trace);
227        }
228
229        match explain_type {
230            ExplainType::DistSql => {
231                if let Ok(plan) = &plan {
232                    match plan {
233                        PhysicalPlanRef::Batch(plan) => {
234                            let worker_node_manager_reader = WorkerNodeSelector::new(
235                                session.env().worker_node_manager_ref(),
236                                session.is_barrier_read(),
237                            );
238                            batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
239                                worker_node_manager_reader,
240                                session.env().catalog_reader().clone(),
241                                session.config().batch_parallelism().0,
242                                session.config().timezone().to_owned(),
243                                plan.clone(),
244                            )?);
245                            batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
246                                ExplainFormat::Dot
247                            } else {
248                                ExplainFormat::Json
249                            }
250                        }
251                        PhysicalPlanRef::Stream(plan) => {
252                            let graph = build_graph(plan.clone(), None)?;
253                            let table = table.map(|x| x.to_prost());
254                            if explain_format == ExplainFormat::Dot {
255                                blocks.push(explain_stream_graph_as_dot(
256                                    &graph,
257                                    table,
258                                    explain_verbose,
259                                ))
260                            } else {
261                                blocks.push(explain_stream_graph(&graph, table, explain_verbose));
262                            }
263                        }
264                    }
265                }
266            }
267            ExplainType::Physical => {
268                // if explain trace is on, the plan has been in the rows
269                if !explain_trace && let Ok(physical_plan) = &plan {
270                    let plan = match &physical_plan {
271                        PhysicalPlanRef::Stream(plan) => plan as &dyn Explain,
272                        PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
273                    };
274                    match explain_format {
275                        ExplainFormat::Text => {
276                            blocks.push(plan.explain_to_string());
277                        }
278                        ExplainFormat::Json => blocks.push(plan.explain_to_json()),
279                        ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
280                        ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
281                        ExplainFormat::Dot => {
282                            if explain_backfill && let PhysicalPlanRef::Stream(plan) = physical_plan
283                            {
284                                let dot_formatted_backfill_order =
285                                    explain_backfill_order_in_dot_format(
286                                        &session,
287                                        context.with_options().backfill_order_strategy(),
288                                        plan.clone(),
289                                    )?;
290                                blocks.push(dot_formatted_backfill_order);
291                            } else {
292                                blocks.push(plan.explain_to_dot());
293                            }
294                        }
295                    }
296                }
297            }
298            ExplainType::Logical => {
299                // if explain trace is on, the plan has been in the rows
300                if !explain_trace {
301                    let output = context.take_logical().ok_or_else(|| {
302                        ErrorCode::InternalError("Logical plan not found for query".into())
303                    })?;
304                    blocks.push(output);
305                }
306            }
307        }
308
309        // Throw the error.
310        plan?;
311    }
312
313    if let Some(fragmenter) = batch_plan_fragmenter {
314        let query = fragmenter.generate_complete_query().await?;
315        let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
316            let graph = query.stage_graph.to_petgraph();
317            let dot = Dot::new(&graph);
318            dot.to_string()
319        } else {
320            serde_json::to_string_pretty(&query.stage_graph).unwrap()
321        };
322        blocks.push(stage_graph);
323    }
324
325    Ok(())
326}
327
328pub async fn handle_explain(
329    handler_args: HandlerArgs,
330    stmt: Statement,
331    options: ExplainOptions,
332    analyze: bool,
333) -> Result<RwPgResponse> {
334    if analyze {
335        // NOTE(kwannoel): This path is for explain analyze on stream and batch queries.
336        // For existing stream jobs, see the handler module `explain_analyze` instead.
337        bail_not_implemented!(issue = 4856, "explain analyze");
338    }
339    if options.trace && options.explain_format == ExplainFormat::Json {
340        return Err(ErrorCode::NotSupported(
341            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
342            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
343        )
344        .into());
345    }
346    if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
347    {
348        return Err(ErrorCode::NotSupported(
349            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
350            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
351        )
352        .into());
353    }
354
355    let mut blocks = Vec::new();
356    let result = do_handle_explain(handler_args, options.clone(), stmt, &mut blocks).await;
357
358    if let Err(e) = result {
359        if options.trace {
360            // If `trace` is on, we include the error in the output with partial traces.
361            blocks.push(if options.verbose {
362                format!("ERROR: {:?}", e.as_report())
363            } else {
364                format!("ERROR: {}", e.as_report())
365            });
366        } else {
367            // Else, directly return the error.
368            return Err(e);
369        }
370    }
371
372    let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
373        query_plan: l.into(),
374    });
375
376    Ok(PgResponse::builder(StatementType::EXPLAIN)
377        .rows(rows)
378        .into())
379}
380
381#[derive(Fields)]
382#[fields(style = "TITLE CASE")]
383pub(crate) struct ExplainRow {
384    pub query_plan: String,
385}