risingwave_frontend/handler/
explain.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21    ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
39use crate::scheduler::BatchPlanFragmenter;
40use crate::stream_fragmenter::build_graph;
41use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
42
43pub async fn do_handle_explain(
44    handler_args: HandlerArgs,
45    explain_options: ExplainOptions,
46    stmt: Statement,
47    blocks: &mut Vec<String>,
48) -> Result<()> {
49    // Workaround to avoid `Rc` across `await` point.
50    let mut batch_plan_fragmenter = None;
51    let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
52
53    let session = handler_args.session.clone();
54
55    enum PhysicalPlanRef {
56        Stream(StreamPlanRef),
57        Batch(BatchPlanRef),
58    }
59
60    {
61        let (plan, table, context) = match stmt {
62            // `CREATE TABLE` takes the ownership of the `OptimizerContext` to avoid `Rc` across
63            // `await` point. We can only take the reference back from the `PlanRef` if it's
64            // successfully planned.
65            Statement::CreateTable {
66                name,
67                columns,
68                constraints,
69                format_encode,
70                source_watermarks,
71                append_only,
72                on_conflict,
73                with_version_column,
74                cdc_table_info,
75                include_column_options,
76                wildcard_idx,
77                webhook_info,
78                ..
79            } => {
80                let format_encode = format_encode.map(|s| s.into_v2_with_warning());
81
82                let (plan, _source, table, _job_type, _) = handle_create_table_plan(
83                    handler_args,
84                    explain_options,
85                    format_encode,
86                    cdc_table_info,
87                    &name,
88                    columns,
89                    wildcard_idx,
90                    constraints,
91                    source_watermarks,
92                    append_only,
93                    on_conflict,
94                    with_version_column.map(|x| x.real_value()),
95                    include_column_options,
96                    webhook_info,
97                    risingwave_common::catalog::Engine::Hummock,
98                )
99                .await?;
100                let context = plan.ctx();
101                (Ok(PhysicalPlanRef::Stream(plan)), Some(table), context)
102            }
103            Statement::CreateSink { stmt } => {
104                let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
105                    .await
106                    .map(|plan| plan.sink_plan)?;
107                let context = plan.ctx();
108                (Ok(PhysicalPlanRef::Stream(plan)), None, context)
109            }
110
111            Statement::FetchCursor {
112                stmt: FetchCursorStatement { cursor_name, .. },
113            } => {
114                let cursor_manager = session.clone().get_cursor_manager();
115                let plan = cursor_manager
116                    .gen_batch_plan_with_subscription_cursor(
117                        &cursor_name.real_value(),
118                        handler_args,
119                    )
120                    .await
121                    .map(|x| x.plan)?;
122                let context = plan.ctx();
123                (Ok(PhysicalPlanRef::Batch(plan)), None, context)
124            }
125
126            // For other queries without `await` point, we can keep a copy of reference to the
127            // `OptimizerContext` even if the planning fails. This enables us to log the partial
128            // traces for better debugging experience.
129            _ => {
130                let context: OptimizerContextRef =
131                    OptimizerContext::new(handler_args, explain_options).into();
132                let (plan, table) = match stmt {
133                    // -- Streaming DDLs --
134                    Statement::CreateView {
135                        or_replace: false,
136                        materialized: true,
137                        query,
138                        name,
139                        columns,
140                        emit_mode,
141                        ..
142                    } => gen_create_mv_plan(
143                        &session,
144                        context.clone(),
145                        *query,
146                        name,
147                        columns,
148                        emit_mode,
149                    )
150                    .map(|(plan, table)| (PhysicalPlanRef::Stream(plan), Some(table))),
151                    Statement::CreateView {
152                        materialized: false,
153                        ..
154                    } => {
155                        return Err(ErrorCode::NotSupported(
156                            "EXPLAIN CREATE VIEW".into(),
157                            "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
158                        ).into());
159                    }
160
161                    Statement::CreateSubscription { .. } => {
162                        return Err(ErrorCode::NotSupported(
163                            "EXPLAIN CREATE SUBSCRIPTION".into(),
164                            "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
165                        ).into());
166                    }
167                    Statement::CreateIndex {
168                        name,
169                        table_name,
170                        columns,
171                        include,
172                        distributed_by,
173                        ..
174                    } => {
175                        let (schema_name, table, index_table_name) =
176                            resolve_index_schema(&session, name, table_name)?;
177                        gen_create_index_plan(
178                            &session,
179                            context.clone(),
180                            schema_name,
181                            table,
182                            index_table_name,
183                            columns,
184                            include,
185                            distributed_by,
186                        )
187                    }
188                    .map(|(plan, index_table, _index)| {
189                        (PhysicalPlanRef::Stream(plan), Some(index_table))
190                    }),
191
192                    // -- Batch Queries --
193                    Statement::Insert { .. }
194                    | Statement::Delete { .. }
195                    | Statement::Update { .. }
196                    | Statement::Query { .. } => {
197                        gen_batch_plan_by_statement(&session, context, stmt)
198                            .map(|x| (PhysicalPlanRef::Batch(x.plan), None))
199                    }
200
201                    _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
202                }?;
203
204                let context = match &plan {
205                    PhysicalPlanRef::Stream(plan) => plan.ctx(),
206                    PhysicalPlanRef::Batch(plan) => plan.ctx(),
207                };
208
209                (Ok(plan) as Result<_>, table, context)
210            }
211        };
212
213        let explain_trace = context.is_explain_trace();
214        let explain_verbose = context.is_explain_verbose();
215        let explain_backfill = context.is_explain_backfill();
216        let explain_type = context.explain_type();
217        let explain_format = context.explain_format();
218
219        if explain_trace {
220            let trace = context.take_trace();
221            blocks.extend(trace);
222        }
223
224        match explain_type {
225            ExplainType::DistSql => {
226                if let Ok(plan) = &plan {
227                    match plan {
228                        PhysicalPlanRef::Batch(plan) => {
229                            let worker_node_manager_reader = WorkerNodeSelector::new(
230                                session.env().worker_node_manager_ref(),
231                                session.is_barrier_read(),
232                            );
233                            batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
234                                worker_node_manager_reader,
235                                session.env().catalog_reader().clone(),
236                                session.config().batch_parallelism().0,
237                                session.config().timezone().to_owned(),
238                                plan.clone(),
239                            )?);
240                            batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
241                                ExplainFormat::Dot
242                            } else {
243                                ExplainFormat::Json
244                            }
245                        }
246                        PhysicalPlanRef::Stream(plan) => {
247                            let graph = build_graph(plan.clone(), None)?;
248                            let table = table.map(|x| x.to_prost());
249                            if explain_format == ExplainFormat::Dot {
250                                blocks.push(explain_stream_graph_as_dot(
251                                    &graph,
252                                    table,
253                                    explain_verbose,
254                                ))
255                            } else {
256                                blocks.push(explain_stream_graph(&graph, table, explain_verbose));
257                            }
258                        }
259                    }
260                }
261            }
262            ExplainType::Physical => {
263                // if explain trace is on, the plan has been in the rows
264                if !explain_trace && let Ok(physical_plan) = &plan {
265                    let plan = match &physical_plan {
266                        PhysicalPlanRef::Stream(plan) => plan as &dyn Explain,
267                        PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
268                    };
269                    match explain_format {
270                        ExplainFormat::Text => {
271                            blocks.push(plan.explain_to_string());
272                        }
273                        ExplainFormat::Json => blocks.push(plan.explain_to_json()),
274                        ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
275                        ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
276                        ExplainFormat::Dot => {
277                            if explain_backfill && let PhysicalPlanRef::Stream(plan) = physical_plan
278                            {
279                                let dot_formatted_backfill_order =
280                                    explain_backfill_order_in_dot_format(
281                                        &session,
282                                        context.with_options().backfill_order_strategy(),
283                                        plan.clone(),
284                                    )?;
285                                blocks.push(dot_formatted_backfill_order);
286                            } else {
287                                blocks.push(plan.explain_to_dot());
288                            }
289                        }
290                    }
291                }
292            }
293            ExplainType::Logical => {
294                // if explain trace is on, the plan has been in the rows
295                if !explain_trace {
296                    let output = context.take_logical().ok_or_else(|| {
297                        ErrorCode::InternalError("Logical plan not found for query".into())
298                    })?;
299                    blocks.push(output);
300                }
301            }
302        }
303
304        // Throw the error.
305        plan?;
306    }
307
308    if let Some(fragmenter) = batch_plan_fragmenter {
309        let query = fragmenter.generate_complete_query().await?;
310        let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
311            let graph = query.stage_graph.to_petgraph();
312            let dot = Dot::new(&graph);
313            dot.to_string()
314        } else {
315            serde_json::to_string_pretty(&query.stage_graph).unwrap()
316        };
317        blocks.push(stage_graph);
318    }
319
320    Ok(())
321}
322
323pub async fn handle_explain(
324    handler_args: HandlerArgs,
325    stmt: Statement,
326    options: ExplainOptions,
327    analyze: bool,
328) -> Result<RwPgResponse> {
329    if analyze {
330        // NOTE(kwannoel): This path is for explain analyze on stream and batch queries.
331        // For existing stream jobs, see the handler module `explain_analyze` instead.
332        bail_not_implemented!(issue = 4856, "explain analyze");
333    }
334    if options.trace && options.explain_format == ExplainFormat::Json {
335        return Err(ErrorCode::NotSupported(
336            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
337            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
338        )
339        .into());
340    }
341    if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
342    {
343        return Err(ErrorCode::NotSupported(
344            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
345            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
346        )
347        .into());
348    }
349
350    let mut blocks = Vec::new();
351    let result = do_handle_explain(handler_args, options.clone(), stmt, &mut blocks).await;
352
353    if let Err(e) = result {
354        if options.trace {
355            // If `trace` is on, we include the error in the output with partial traces.
356            blocks.push(if options.verbose {
357                format!("ERROR: {:?}", e.as_report())
358            } else {
359                format!("ERROR: {}", e.as_report())
360            });
361        } else {
362            // Else, directly return the error.
363            return Err(e);
364        }
365    }
366
367    let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
368        query_plan: l.into(),
369    });
370
371    Ok(PgResponse::builder(StatementType::EXPLAIN)
372        .rows(rows)
373        .into())
374}
375
376#[derive(Fields)]
377#[fields(style = "TITLE CASE")]
378pub(crate) struct ExplainRow {
379    pub query_plan: String,
380}