risingwave_frontend/handler/
explain.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21    ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
39use crate::scheduler::BatchPlanFragmenter;
40use crate::stream_fragmenter::build_graph;
41use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
42
43pub async fn do_handle_explain(
44    handler_args: HandlerArgs,
45    explain_options: ExplainOptions,
46    stmt: Statement,
47    blocks: &mut Vec<String>,
48) -> Result<()> {
49    // Workaround to avoid `Rc` across `await` point.
50    let mut batch_plan_fragmenter = None;
51    let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
52
53    let session = handler_args.session.clone();
54
55    enum PhysicalPlanRef {
56        Stream(StreamPlanRef),
57        Batch(BatchPlanRef),
58    }
59
60    {
61        let (plan, table, context) = match stmt {
62            // `CREATE TABLE` takes the ownership of the `OptimizerContext` to avoid `Rc` across
63            // `await` point. We can only take the reference back from the `PlanRef` if it's
64            // successfully planned.
65            Statement::CreateTable {
66                name,
67                columns,
68                constraints,
69                format_encode,
70                source_watermarks,
71                append_only,
72                on_conflict,
73                with_version_columns,
74                cdc_table_info,
75                include_column_options,
76                wildcard_idx,
77                webhook_info,
78                ..
79            } => {
80                let format_encode = format_encode.map(|s| s.into_v2_with_warning());
81
82                let (plan, _source, table, _job_type, _) = handle_create_table_plan(
83                    handler_args,
84                    explain_options,
85                    format_encode,
86                    cdc_table_info,
87                    &name,
88                    columns,
89                    wildcard_idx,
90                    constraints,
91                    source_watermarks,
92                    append_only,
93                    on_conflict,
94                    with_version_columns
95                        .iter()
96                        .map(|col| col.real_value())
97                        .collect(),
98                    include_column_options,
99                    webhook_info,
100                    risingwave_common::catalog::Engine::Hummock,
101                )
102                .await?;
103                let context = plan.ctx();
104                (Ok(PhysicalPlanRef::Stream(plan)), Some(table), context)
105            }
106            Statement::CreateSink { stmt } => {
107                let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
108                    .await
109                    .map(|plan| plan.sink_plan)?;
110                let context = plan.ctx();
111                (Ok(PhysicalPlanRef::Stream(plan)), None, context)
112            }
113
114            Statement::FetchCursor {
115                stmt: FetchCursorStatement { cursor_name, .. },
116            } => {
117                let cursor_manager = session.clone().get_cursor_manager();
118                let plan = cursor_manager
119                    .gen_batch_plan_with_subscription_cursor(
120                        &cursor_name.real_value(),
121                        handler_args,
122                    )
123                    .await
124                    .map(|x| x.plan)?;
125                let context = plan.ctx();
126                (Ok(PhysicalPlanRef::Batch(plan)), None, context)
127            }
128
129            // For other queries without `await` point, we can keep a copy of reference to the
130            // `OptimizerContext` even if the planning fails. This enables us to log the partial
131            // traces for better debugging experience.
132            _ => {
133                let context: OptimizerContextRef =
134                    OptimizerContext::new(handler_args, explain_options).into();
135                let context_clone = context.clone();
136
137                let res = match stmt {
138                    // -- Streaming DDLs --
139                    Statement::CreateView {
140                        or_replace: false,
141                        materialized: true,
142                        query,
143                        name,
144                        columns,
145                        emit_mode,
146                        ..
147                    } => gen_create_mv_plan(&session, context, *query, name, columns, emit_mode)
148                        .map(|(plan, table)| (PhysicalPlanRef::Stream(plan), Some(table))),
149                    Statement::CreateView {
150                        materialized: false,
151                        ..
152                    } => {
153                        return Err(ErrorCode::NotSupported(
154                            "EXPLAIN CREATE VIEW".into(),
155                            "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
156                        ).into());
157                    }
158
159                    Statement::CreateSubscription { .. } => {
160                        return Err(ErrorCode::NotSupported(
161                            "EXPLAIN CREATE SUBSCRIPTION".into(),
162                            "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
163                        ).into());
164                    }
165                    Statement::CreateIndex {
166                        name,
167                        table_name,
168                        method,
169                        columns,
170                        include,
171                        distributed_by,
172                        ..
173                    } => {
174                        let (schema_name, table, index_table_name) =
175                            resolve_index_schema(&session, name, table_name)?;
176                        gen_create_index_plan(
177                            &session,
178                            context,
179                            schema_name,
180                            table,
181                            index_table_name,
182                            method,
183                            columns,
184                            include,
185                            distributed_by,
186                        )
187                    }
188                    .map(|(plan, index_table, _index)| {
189                        (PhysicalPlanRef::Stream(plan), Some(index_table))
190                    }),
191
192                    // -- Batch Queries --
193                    Statement::Insert { .. }
194                    | Statement::Delete { .. }
195                    | Statement::Update { .. }
196                    | Statement::Query { .. } => {
197                        gen_batch_plan_by_statement(&session, context, stmt)
198                            .map(|x| (PhysicalPlanRef::Batch(x.plan), None))
199                    }
200
201                    _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
202                };
203
204                match res {
205                    Ok((plan, table)) => (Ok(plan), table, context_clone),
206                    Err(e) => (Err(e), None, context_clone),
207                }
208            }
209        };
210
211        let explain_trace = explain_options.trace;
212        let explain_verbose = explain_options.verbose;
213        let explain_backfill = explain_options.backfill;
214        let explain_type = explain_options.explain_type;
215        let explain_format = explain_options.explain_format;
216
217        if explain_trace {
218            let trace = context.take_trace();
219            blocks.extend(trace);
220        }
221
222        match explain_type {
223            ExplainType::DistSql => {
224                if let Ok(plan) = &plan {
225                    match plan {
226                        PhysicalPlanRef::Batch(plan) => {
227                            let worker_node_manager_reader = WorkerNodeSelector::new(
228                                session.env().worker_node_manager_ref(),
229                                session.is_barrier_read(),
230                            );
231                            batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
232                                worker_node_manager_reader,
233                                session.env().catalog_reader().clone(),
234                                session.config().batch_parallelism().0,
235                                session.config().timezone(),
236                                plan.clone(),
237                            )?);
238                            batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
239                                ExplainFormat::Dot
240                            } else {
241                                ExplainFormat::Json
242                            }
243                        }
244                        PhysicalPlanRef::Stream(plan) => {
245                            let graph = build_graph(plan.clone(), None)?;
246                            let table = table.map(|x| x.to_prost());
247                            if explain_format == ExplainFormat::Dot {
248                                blocks.push(explain_stream_graph_as_dot(
249                                    &graph,
250                                    table,
251                                    explain_verbose,
252                                ))
253                            } else {
254                                blocks.push(explain_stream_graph(&graph, table, explain_verbose));
255                            }
256                        }
257                    }
258                }
259            }
260            ExplainType::Physical => {
261                // if explain trace is on, the plan has been in the rows
262                if !explain_trace && let Ok(physical_plan) = &plan {
263                    let plan = match &physical_plan {
264                        PhysicalPlanRef::Stream(plan) => plan as &dyn Explain,
265                        PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
266                    };
267                    match explain_format {
268                        ExplainFormat::Text => {
269                            blocks.push(plan.explain_to_string());
270                        }
271                        ExplainFormat::Json => blocks.push(plan.explain_to_json()),
272                        ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
273                        ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
274                        ExplainFormat::Dot => {
275                            if explain_backfill && let PhysicalPlanRef::Stream(plan) = physical_plan
276                            {
277                                let dot_formatted_backfill_order =
278                                    explain_backfill_order_in_dot_format(
279                                        &session,
280                                        context.with_options().backfill_order_strategy(),
281                                        plan.clone(),
282                                    )?;
283                                blocks.push(dot_formatted_backfill_order);
284                            } else {
285                                blocks.push(plan.explain_to_dot());
286                            }
287                        }
288                    }
289                }
290            }
291            ExplainType::Logical => {
292                // if explain trace is on, the plan has been in the rows
293                if !explain_trace {
294                    let output = context.take_logical().ok_or_else(|| {
295                        ErrorCode::InternalError("Logical plan not found for query".into())
296                    })?;
297                    blocks.push(output);
298                }
299            }
300        }
301
302        // Throw the error.
303        plan?;
304    }
305
306    if let Some(fragmenter) = batch_plan_fragmenter {
307        let query = fragmenter.generate_complete_query().await?;
308        let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
309            let graph = query.stage_graph.to_petgraph();
310            let dot = Dot::new(&graph);
311            dot.to_string()
312        } else {
313            serde_json::to_string_pretty(&query.stage_graph).unwrap()
314        };
315        blocks.push(stage_graph);
316    }
317
318    Ok(())
319}
320
321pub async fn handle_explain(
322    handler_args: HandlerArgs,
323    stmt: Statement,
324    options: ExplainOptions,
325    analyze: bool,
326) -> Result<RwPgResponse> {
327    if analyze {
328        // NOTE(kwannoel): This path is for explain analyze on stream and batch queries.
329        // For existing stream jobs, see the handler module `explain_analyze` instead.
330        bail_not_implemented!(issue = 4856, "explain analyze");
331    }
332    if options.trace && options.explain_format == ExplainFormat::Json {
333        return Err(ErrorCode::NotSupported(
334            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
335            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
336        )
337        .into());
338    }
339    if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
340    {
341        return Err(ErrorCode::NotSupported(
342            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
343            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
344        )
345        .into());
346    }
347
348    let mut blocks = Vec::new();
349    let result = do_handle_explain(handler_args, options, stmt, &mut blocks).await;
350
351    if let Err(e) = result {
352        if options.trace {
353            // If `trace` is on, we include the error in the output with partial traces.
354            blocks.push(if options.verbose {
355                format!("ERROR: {:?}", e.as_report())
356            } else {
357                format!("ERROR: {}", e.as_report())
358            });
359        } else {
360            // Else, directly return the error.
361            return Err(e);
362        }
363    }
364
365    let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
366        query_plan: l.into(),
367    });
368
369    Ok(PgResponse::builder(StatementType::EXPLAIN)
370        .rows(rows)
371        .into())
372}
373
374#[derive(Fields)]
375#[fields(style = "TITLE CASE")]
376pub(crate) struct ExplainRow {
377    pub query_plan: String,
378}