Skip to main content

risingwave_frontend/handler/
explain.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "datafusion")]
16use datafusion::physical_plan::{ExecutionPlan, displayable};
17use petgraph::dot::Dot;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::types::Fields;
22use risingwave_sqlparser::ast::{
23    ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
24};
25use thiserror_ext::AsReport;
26
27use super::create_index::{gen_create_index_plan, resolve_index_schema};
28use super::create_mv::explain_create_mv_plan;
29use super::create_sink::gen_sink_plan;
30use super::query::{BatchPlanChoice, gen_batch_plan_by_statement};
31use super::util::SourceSchemaCompatExt;
32use super::{RwPgResponse, RwPgResponseBuilderExt};
33use crate::OptimizerContextRef;
34#[cfg(feature = "datafusion")]
35use crate::datafusion::{
36    DfBatchQueryPlanResult, build_datafusion_physical_plan, create_datafusion_context,
37};
38use crate::error::{ErrorCode, Result};
39use crate::handler::HandlerArgs;
40use crate::handler::create_table::handle_create_table_plan;
41use crate::optimizer::OptimizerContext;
42use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
43use crate::optimizer::plan_node::generic::GenericPlanRef;
44use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
45use crate::scheduler::BatchPlanFragmenter;
46use crate::stream_fragmenter::{GraphJobType, build_graph};
47use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
48
49pub async fn do_handle_explain(
50    handler_args: HandlerArgs,
51    explain_options: ExplainOptions,
52    stmt: Statement,
53    blocks: &mut Vec<String>,
54) -> Result<()> {
55    // Workaround to avoid `Rc` across `await` point.
56    let mut batch_plan_fragmenter = None;
57    let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
58    #[cfg(feature = "datafusion")]
59    let mut datafusion_physical_plan_request: Option<(
60        DfBatchQueryPlanResult,
61        ExplainFormat,
62        bool,
63    )> = None;
64
65    let session = handler_args.session.clone();
66
67    enum PhysicalPlanRef {
68        Stream(StreamPlanRef, Option<GraphJobType>),
69        Batch(BatchPlanRef),
70    }
71    enum PlanToExplain {
72        Rw(PhysicalPlanRef),
73        #[cfg(feature = "datafusion")]
74        Df(DfBatchQueryPlanResult),
75    }
76
77    {
78        let (plan, table, context) = match stmt {
79            // `CREATE TABLE` takes the ownership of the `OptimizerContext` to avoid `Rc` across
80            // `await` point. We can only take the reference back from the `PlanRef` if it's
81            // successfully planned.
82            Statement::CreateTable {
83                name,
84                columns,
85                constraints,
86                format_encode,
87                source_watermarks,
88                append_only,
89                on_conflict,
90                with_version_columns,
91                cdc_table_info,
92                include_column_options,
93                wildcard_idx,
94                webhook_info,
95                ..
96            } => {
97                let format_encode = format_encode.map(|s| s.into_v2_with_warning());
98
99                let (plan, _source, table, _job_type, _) = handle_create_table_plan(
100                    handler_args,
101                    explain_options,
102                    format_encode,
103                    cdc_table_info,
104                    &name,
105                    columns,
106                    wildcard_idx,
107                    constraints,
108                    source_watermarks,
109                    append_only,
110                    on_conflict,
111                    with_version_columns
112                        .iter()
113                        .map(|col| col.real_value())
114                        .collect(),
115                    include_column_options,
116                    webhook_info,
117                    risingwave_common::catalog::Engine::Hummock,
118                )
119                .await?;
120                let context = plan.ctx();
121                (
122                    Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(
123                        plan,
124                        Some(GraphJobType::Table),
125                    ))),
126                    Some(table),
127                    context,
128                )
129            }
130            Statement::CreateSink { stmt } => {
131                let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
132                    .await
133                    .map(|plan| plan.sink_plan)?;
134                let context = plan.ctx();
135                (
136                    Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(
137                        plan,
138                        Some(GraphJobType::Sink),
139                    ))),
140                    None,
141                    context,
142                )
143            }
144
145            Statement::FetchCursor {
146                stmt: FetchCursorStatement { cursor_name, .. },
147            } => {
148                let cursor_manager = session.clone().get_cursor_manager();
149                let plan = cursor_manager
150                    .gen_batch_plan_with_subscription_cursor(
151                        &cursor_name.real_value(),
152                        handler_args,
153                    )
154                    .await
155                    .map(|x| x.plan)?;
156                let context = plan.ctx();
157                (
158                    Ok(PlanToExplain::Rw(PhysicalPlanRef::Batch(plan))),
159                    None,
160                    context,
161                )
162            }
163
164            // For other queries without `await` point, we can keep a copy of reference to the
165            // `OptimizerContext` even if the planning fails. This enables us to log the partial
166            // traces for better debugging experience.
167            _ => {
168                let context: OptimizerContextRef =
169                    OptimizerContext::new(handler_args, explain_options).into();
170                let context_clone = context.clone();
171
172                let res = match stmt {
173                    // -- Streaming DDLs --
174                    Statement::CreateView {
175                        or_replace: false,
176                        materialized: true,
177                        query,
178                        name,
179                        columns,
180                        emit_mode,
181                        ..
182                    } => {
183                        explain_create_mv_plan(&session, context, *query, name, columns, emit_mode)
184                            .map(|(plan, table)| {
185                                (
186                                    PlanToExplain::Rw(PhysicalPlanRef::Stream(
187                                        plan,
188                                        Some(GraphJobType::MaterializedView),
189                                    )),
190                                    Some(table),
191                                )
192                            })
193                    }
194                    Statement::CreateView {
195                        materialized: false,
196                        ..
197                    } => {
198                        return Err(ErrorCode::NotSupported(
199                            "EXPLAIN CREATE VIEW".into(),
200                            "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
201                        ).into());
202                    }
203
204                    Statement::CreateSubscription { .. } => {
205                        return Err(ErrorCode::NotSupported(
206                            "EXPLAIN CREATE SUBSCRIPTION".into(),
207                            "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
208                        ).into());
209                    }
210                    Statement::CreateIndex {
211                        name,
212                        table_name,
213                        method,
214                        columns,
215                        include,
216                        distributed_by,
217                        ..
218                    } => {
219                        let (schema_name, table, index_table_name) =
220                            resolve_index_schema(&session, name, table_name)?;
221                        gen_create_index_plan(
222                            &session,
223                            context,
224                            schema_name,
225                            table,
226                            index_table_name,
227                            method,
228                            columns,
229                            include,
230                            distributed_by,
231                        )
232                    }
233                    .map(|(plan, index_table, _index)| {
234                        (
235                            PlanToExplain::Rw(PhysicalPlanRef::Stream(
236                                plan,
237                                Some(GraphJobType::Index),
238                            )),
239                            Some(index_table),
240                        )
241                    }),
242
243                    // -- Batch Queries --
244                    Statement::Insert { .. }
245                    | Statement::Delete { .. }
246                    | Statement::Update { .. }
247                    | Statement::Query { .. } => {
248                        match gen_batch_plan_by_statement(&session, context, stmt)? {
249                            BatchPlanChoice::Rw(plan_result) => Ok((
250                                PlanToExplain::Rw(PhysicalPlanRef::Batch(plan_result.plan)),
251                                None,
252                            )),
253                            #[cfg(feature = "datafusion")]
254                            BatchPlanChoice::Df(plan_result) => {
255                                Ok((PlanToExplain::Df(plan_result), None))
256                            }
257                        }
258                    }
259
260                    _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
261                };
262
263                match res {
264                    Ok((plan, table)) => (Ok(plan), table, context_clone),
265                    Err(e) => (Err(e), None, context_clone),
266                }
267            }
268        };
269
270        let explain_trace = explain_options.trace;
271        let explain_verbose = explain_options.verbose;
272        let explain_backfill = explain_options.backfill;
273        let explain_type = explain_options.explain_type;
274        let explain_format = explain_options.explain_format;
275
276        if explain_trace {
277            let trace = context.take_trace();
278            blocks.extend(trace);
279        }
280
281        match explain_type {
282            ExplainType::DistSql => {
283                if let Ok(plan) = &plan {
284                    match plan {
285                        PlanToExplain::Rw(PhysicalPlanRef::Batch(plan)) => {
286                            let worker_node_manager_reader = WorkerNodeSelector::new(
287                                session.env().worker_node_manager_ref(),
288                                session.is_barrier_read(),
289                            );
290                            batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
291                                worker_node_manager_reader,
292                                session.env().catalog_reader().clone(),
293                                session.config().batch_parallelism().0,
294                                plan.clone(),
295                            )?);
296                            batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
297                                ExplainFormat::Dot
298                            } else {
299                                ExplainFormat::Json
300                            }
301                        }
302                        PlanToExplain::Rw(PhysicalPlanRef::Stream(plan, job_type)) => {
303                            let graph = build_graph(plan.clone(), *job_type)?;
304                            let table = table.map(|x| x.to_prost());
305                            if explain_format == ExplainFormat::Dot {
306                                blocks.push(explain_stream_graph_as_dot(
307                                    &graph,
308                                    table,
309                                    explain_verbose,
310                                ))
311                            } else {
312                                blocks.push(explain_stream_graph(&graph, table, explain_verbose));
313                            }
314                        }
315                        #[cfg(feature = "datafusion")]
316                        PlanToExplain::Df(_) => {
317                            return Err(ErrorCode::NotSupported(
318                                "EXPLAIN DISTRIBUTED for DataFusion plan".into(),
319                                "Distributed explain is only available for RisingWave batch plans."
320                                    .into(),
321                            )
322                            .into());
323                        }
324                    }
325                }
326            }
327            ExplainType::Physical => {
328                // if explain trace is on, the plan has been in the rows
329                if !explain_trace && let Ok(physical_plan) = &plan {
330                    match physical_plan {
331                        PlanToExplain::Rw(physical_plan) => {
332                            let plan = match &physical_plan {
333                                PhysicalPlanRef::Stream(plan, _) => plan as &dyn Explain,
334                                PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
335                            };
336                            match explain_format {
337                                ExplainFormat::Text => {
338                                    blocks.push(plan.explain_to_string());
339                                }
340                                ExplainFormat::Json => blocks.push(plan.explain_to_json()),
341                                ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
342                                ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
343                                ExplainFormat::Dot => {
344                                    if explain_backfill
345                                        && let PhysicalPlanRef::Stream(plan, _) = physical_plan
346                                    {
347                                        let dot_formatted_backfill_order =
348                                            explain_backfill_order_in_dot_format(
349                                                &session,
350                                                context.with_options().backfill_order_strategy(),
351                                                plan.clone(),
352                                            )?;
353                                        blocks.push(dot_formatted_backfill_order);
354                                    } else {
355                                        blocks.push(plan.explain_to_dot());
356                                    }
357                                }
358                            }
359                        }
360                        #[cfg(feature = "datafusion")]
361                        PlanToExplain::Df(plan) => {
362                            datafusion_physical_plan_request =
363                                Some((plan.clone(), explain_format, explain_verbose));
364                        }
365                    }
366                }
367            }
368            ExplainType::Logical => {
369                // if explain trace is on, the plan has been in the rows
370                if !explain_trace {
371                    let output = context.take_logical().ok_or_else(|| {
372                        ErrorCode::InternalError("Logical plan not found for query".into())
373                    })?;
374                    blocks.push(output);
375                }
376            }
377        }
378
379        // Throw the error.
380        plan?;
381    }
382
383    // Use explain_datafusion_plan here to avoid `Rc` across `await` point.
384    #[cfg(feature = "datafusion")]
385    if let Some((plan, explain_format, explain_verbose)) = datafusion_physical_plan_request {
386        let df_ctx = create_datafusion_context(session.as_ref())?;
387        let physical_plan = build_datafusion_physical_plan(&df_ctx, &plan).await?;
388        explain_datafusion_plan(
389            physical_plan.as_ref(),
390            explain_format,
391            explain_verbose,
392            blocks,
393        )?;
394    }
395
396    if let Some(fragmenter) = batch_plan_fragmenter {
397        let query = fragmenter.generate_complete_query().await?;
398        let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
399            let graph = query.stage_graph.to_petgraph();
400            let dot = Dot::new(&graph);
401            dot.to_string()
402        } else {
403            serde_json::to_string_pretty(&query.stage_graph).unwrap()
404        };
405        blocks.push(stage_graph);
406    }
407
408    Ok(())
409}
410
411#[cfg(feature = "datafusion")]
412fn explain_datafusion_plan(
413    plan: &dyn ExecutionPlan,
414    explain_format: ExplainFormat,
415    verbose: bool,
416    blocks: &mut Vec<String>,
417) -> Result<()> {
418    let output = match explain_format {
419        ExplainFormat::Text => displayable(plan).indent(verbose).to_string(),
420        unsupported => {
421            return Err(ErrorCode::NotSupported(
422                format!("EXPLAIN ... {:?} for DataFusion plan", unsupported),
423                "Only TEXT format is supported for DataFusion plans.".to_owned(),
424            )
425            .into());
426        }
427    };
428    blocks.push(output);
429    Ok(())
430}
431
432pub async fn handle_explain(
433    handler_args: HandlerArgs,
434    stmt: Statement,
435    options: ExplainOptions,
436    analyze: bool,
437) -> Result<RwPgResponse> {
438    if analyze {
439        // NOTE(kwannoel): This path is for explain analyze on stream and batch queries.
440        // For existing stream jobs, see the handler module `explain_analyze` instead.
441        bail_not_implemented!(issue = 4856, "explain analyze");
442    }
443    if options.trace && options.explain_format == ExplainFormat::Json {
444        return Err(ErrorCode::NotSupported(
445            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
446            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
447        )
448        .into());
449    }
450    if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
451    {
452        return Err(ErrorCode::NotSupported(
453            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
454            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
455        )
456        .into());
457    }
458
459    let mut blocks = Vec::new();
460    let result = Box::pin(do_handle_explain(handler_args, options, stmt, &mut blocks)).await;
461
462    if let Err(e) = result {
463        if options.trace {
464            // If `trace` is on, we include the error in the output with partial traces.
465            blocks.push(if options.verbose {
466                format!("ERROR: {:?}", e.as_report())
467            } else {
468                format!("ERROR: {}", e.as_report())
469            });
470        } else {
471            // Else, directly return the error.
472            return Err(e);
473        }
474    }
475
476    let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
477        query_plan: l.into(),
478    });
479
480    Ok(PgResponse::builder(StatementType::EXPLAIN)
481        .rows(rows)
482        .into())
483}
484
485#[derive(Fields)]
486#[fields(style = "TITLE CASE")]
487pub(crate) struct ExplainRow {
488    pub query_plan: String,
489}