risingwave_frontend/handler/
explain.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21    ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
39use crate::scheduler::BatchPlanFragmenter;
40use crate::stream_fragmenter::build_graph;
41use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
42
43pub async fn do_handle_explain(
44    handler_args: HandlerArgs,
45    explain_options: ExplainOptions,
46    stmt: Statement,
47    blocks: &mut Vec<String>,
48) -> Result<()> {
49    // Workaround to avoid `Rc` across `await` point.
50    let mut batch_plan_fragmenter = None;
51    let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
52
53    let session = handler_args.session.clone();
54
55    enum PhysicalPlanRef {
56        Stream(StreamPlanRef),
57        Batch(BatchPlanRef),
58    }
59
60    {
61        let (plan, table, context) = match stmt {
62            // `CREATE TABLE` takes the ownership of the `OptimizerContext` to avoid `Rc` across
63            // `await` point. We can only take the reference back from the `PlanRef` if it's
64            // successfully planned.
65            Statement::CreateTable {
66                name,
67                columns,
68                constraints,
69                format_encode,
70                source_watermarks,
71                append_only,
72                on_conflict,
73                with_version_columns,
74                cdc_table_info,
75                include_column_options,
76                wildcard_idx,
77                webhook_info,
78                ..
79            } => {
80                let format_encode = format_encode.map(|s| s.into_v2_with_warning());
81
82                let (plan, _source, table, _job_type, _) = handle_create_table_plan(
83                    handler_args,
84                    explain_options,
85                    format_encode,
86                    cdc_table_info,
87                    &name,
88                    columns,
89                    wildcard_idx,
90                    constraints,
91                    source_watermarks,
92                    append_only,
93                    on_conflict,
94                    with_version_columns
95                        .iter()
96                        .map(|col| col.real_value())
97                        .collect(),
98                    include_column_options,
99                    webhook_info,
100                    risingwave_common::catalog::Engine::Hummock,
101                )
102                .await?;
103                let context = plan.ctx();
104                (Ok(PhysicalPlanRef::Stream(plan)), Some(table), context)
105            }
106            Statement::CreateSink { stmt } => {
107                let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
108                    .await
109                    .map(|plan| plan.sink_plan)?;
110                let context = plan.ctx();
111                (Ok(PhysicalPlanRef::Stream(plan)), None, context)
112            }
113
114            Statement::FetchCursor {
115                stmt: FetchCursorStatement { cursor_name, .. },
116            } => {
117                let cursor_manager = session.clone().get_cursor_manager();
118                let plan = cursor_manager
119                    .gen_batch_plan_with_subscription_cursor(
120                        &cursor_name.real_value(),
121                        handler_args,
122                    )
123                    .await
124                    .map(|x| x.plan)?;
125                let context = plan.ctx();
126                (Ok(PhysicalPlanRef::Batch(plan)), None, context)
127            }
128
129            // For other queries without `await` point, we can keep a copy of reference to the
130            // `OptimizerContext` even if the planning fails. This enables us to log the partial
131            // traces for better debugging experience.
132            _ => {
133                let context: OptimizerContextRef =
134                    OptimizerContext::new(handler_args, explain_options).into();
135                let context_clone = context.clone();
136
137                let res = match stmt {
138                    // -- Streaming DDLs --
139                    Statement::CreateView {
140                        or_replace: false,
141                        materialized: true,
142                        query,
143                        name,
144                        columns,
145                        emit_mode,
146                        ..
147                    } => gen_create_mv_plan(&session, context, *query, name, columns, emit_mode)
148                        .map(|(plan, table)| (PhysicalPlanRef::Stream(plan), Some(table))),
149                    Statement::CreateView {
150                        materialized: false,
151                        ..
152                    } => {
153                        return Err(ErrorCode::NotSupported(
154                            "EXPLAIN CREATE VIEW".into(),
155                            "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
156                        ).into());
157                    }
158
159                    Statement::CreateSubscription { .. } => {
160                        return Err(ErrorCode::NotSupported(
161                            "EXPLAIN CREATE SUBSCRIPTION".into(),
162                            "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
163                        ).into());
164                    }
165                    Statement::CreateIndex {
166                        name,
167                        table_name,
168                        method,
169                        columns,
170                        include,
171                        distributed_by,
172                        ..
173                    } => {
174                        let (schema_name, table, index_table_name) =
175                            resolve_index_schema(&session, name, table_name)?;
176                        gen_create_index_plan(
177                            &session,
178                            context,
179                            schema_name,
180                            table,
181                            index_table_name,
182                            method,
183                            columns,
184                            include,
185                            distributed_by,
186                        )
187                    }
188                    .map(|(plan, index_table, _index)| {
189                        (PhysicalPlanRef::Stream(plan), Some(index_table))
190                    }),
191
192                    // -- Batch Queries --
193                    Statement::Insert { .. }
194                    | Statement::Delete { .. }
195                    | Statement::Update { .. }
196                    | Statement::Query { .. } => {
197                        gen_batch_plan_by_statement(&session, context, stmt)
198                            .map(|x| (PhysicalPlanRef::Batch(x.plan), None))
199                    }
200
201                    _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
202                };
203
204                match res {
205                    Ok((plan, table)) => (Ok(plan), table, context_clone),
206                    Err(e) => (Err(e), None, context_clone),
207                }
208            }
209        };
210
211        let explain_trace = explain_options.trace;
212        let explain_verbose = explain_options.verbose;
213        let explain_backfill = explain_options.backfill;
214        let explain_type = explain_options.explain_type;
215        let explain_format = explain_options.explain_format;
216
217        if explain_trace {
218            let trace = context.take_trace();
219            blocks.extend(trace);
220        }
221
222        match explain_type {
223            ExplainType::DistSql => {
224                if let Ok(plan) = &plan {
225                    match plan {
226                        PhysicalPlanRef::Batch(plan) => {
227                            let worker_node_manager_reader = WorkerNodeSelector::new(
228                                session.env().worker_node_manager_ref(),
229                                session.is_barrier_read(),
230                            );
231                            batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
232                                worker_node_manager_reader,
233                                session.env().catalog_reader().clone(),
234                                session.config().batch_parallelism().0,
235                                plan.clone(),
236                            )?);
237                            batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
238                                ExplainFormat::Dot
239                            } else {
240                                ExplainFormat::Json
241                            }
242                        }
243                        PhysicalPlanRef::Stream(plan) => {
244                            let graph = build_graph(plan.clone(), None)?;
245                            let table = table.map(|x| x.to_prost());
246                            if explain_format == ExplainFormat::Dot {
247                                blocks.push(explain_stream_graph_as_dot(
248                                    &graph,
249                                    table,
250                                    explain_verbose,
251                                ))
252                            } else {
253                                blocks.push(explain_stream_graph(&graph, table, explain_verbose));
254                            }
255                        }
256                    }
257                }
258            }
259            ExplainType::Physical => {
260                // if explain trace is on, the plan has been in the rows
261                if !explain_trace && let Ok(physical_plan) = &plan {
262                    let plan = match &physical_plan {
263                        PhysicalPlanRef::Stream(plan) => plan as &dyn Explain,
264                        PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
265                    };
266                    match explain_format {
267                        ExplainFormat::Text => {
268                            blocks.push(plan.explain_to_string());
269                        }
270                        ExplainFormat::Json => blocks.push(plan.explain_to_json()),
271                        ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
272                        ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
273                        ExplainFormat::Dot => {
274                            if explain_backfill && let PhysicalPlanRef::Stream(plan) = physical_plan
275                            {
276                                let dot_formatted_backfill_order =
277                                    explain_backfill_order_in_dot_format(
278                                        &session,
279                                        context.with_options().backfill_order_strategy(),
280                                        plan.clone(),
281                                    )?;
282                                blocks.push(dot_formatted_backfill_order);
283                            } else {
284                                blocks.push(plan.explain_to_dot());
285                            }
286                        }
287                    }
288                }
289            }
290            ExplainType::Logical => {
291                // if explain trace is on, the plan has been in the rows
292                if !explain_trace {
293                    let output = context.take_logical().ok_or_else(|| {
294                        ErrorCode::InternalError("Logical plan not found for query".into())
295                    })?;
296                    blocks.push(output);
297                }
298            }
299        }
300
301        // Throw the error.
302        plan?;
303    }
304
305    if let Some(fragmenter) = batch_plan_fragmenter {
306        let query = fragmenter.generate_complete_query().await?;
307        let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
308            let graph = query.stage_graph.to_petgraph();
309            let dot = Dot::new(&graph);
310            dot.to_string()
311        } else {
312            serde_json::to_string_pretty(&query.stage_graph).unwrap()
313        };
314        blocks.push(stage_graph);
315    }
316
317    Ok(())
318}
319
320pub async fn handle_explain(
321    handler_args: HandlerArgs,
322    stmt: Statement,
323    options: ExplainOptions,
324    analyze: bool,
325) -> Result<RwPgResponse> {
326    if analyze {
327        // NOTE(kwannoel): This path is for explain analyze on stream and batch queries.
328        // For existing stream jobs, see the handler module `explain_analyze` instead.
329        bail_not_implemented!(issue = 4856, "explain analyze");
330    }
331    if options.trace && options.explain_format == ExplainFormat::Json {
332        return Err(ErrorCode::NotSupported(
333            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
334            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
335        )
336        .into());
337    }
338    if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
339    {
340        return Err(ErrorCode::NotSupported(
341            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
342            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
343        )
344        .into());
345    }
346
347    let mut blocks = Vec::new();
348    let result = do_handle_explain(handler_args, options, stmt, &mut blocks).await;
349
350    if let Err(e) = result {
351        if options.trace {
352            // If `trace` is on, we include the error in the output with partial traces.
353            blocks.push(if options.verbose {
354                format!("ERROR: {:?}", e.as_report())
355            } else {
356                format!("ERROR: {}", e.as_report())
357            });
358        } else {
359            // Else, directly return the error.
360            return Err(e);
361        }
362    }
363
364    let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
365        query_plan: l.into(),
366    });
367
368    Ok(PgResponse::builder(StatementType::EXPLAIN)
369        .rows(rows)
370        .into())
371}
372
373#[derive(Fields)]
374#[fields(style = "TITLE CASE")]
375pub(crate) struct ExplainRow {
376    pub query_plan: String,
377}