risingwave_frontend/handler/
explain.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "datafusion")]
16use datafusion::physical_plan::{ExecutionPlan, displayable};
17use petgraph::dot::Dot;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::types::Fields;
22use risingwave_sqlparser::ast::{
23    ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
24};
25use thiserror_ext::AsReport;
26
27use super::create_index::{gen_create_index_plan, resolve_index_schema};
28use super::create_mv::gen_create_mv_plan;
29use super::create_sink::gen_sink_plan;
30use super::query::{BatchPlanChoice, gen_batch_plan_by_statement};
31use super::util::SourceSchemaCompatExt;
32use super::{RwPgResponse, RwPgResponseBuilderExt};
33use crate::OptimizerContextRef;
34#[cfg(feature = "datafusion")]
35use crate::datafusion::{
36    DfBatchQueryPlanResult, build_datafusion_physical_plan, create_datafusion_context,
37};
38use crate::error::{ErrorCode, Result};
39use crate::handler::HandlerArgs;
40use crate::handler::create_table::handle_create_table_plan;
41use crate::optimizer::OptimizerContext;
42use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
43use crate::optimizer::plan_node::generic::GenericPlanRef;
44use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
45use crate::scheduler::BatchPlanFragmenter;
46use crate::stream_fragmenter::build_graph;
47use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
48
49pub async fn do_handle_explain(
50    handler_args: HandlerArgs,
51    explain_options: ExplainOptions,
52    stmt: Statement,
53    blocks: &mut Vec<String>,
54) -> Result<()> {
55    // Workaround to avoid `Rc` across `await` point.
56    let mut batch_plan_fragmenter = None;
57    let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
58    #[cfg(feature = "datafusion")]
59    let mut datafusion_physical_plan_request: Option<(
60        DfBatchQueryPlanResult,
61        ExplainFormat,
62        bool,
63    )> = None;
64
65    let session = handler_args.session.clone();
66
67    enum PhysicalPlanRef {
68        Stream(StreamPlanRef),
69        Batch(BatchPlanRef),
70    }
71    enum PlanToExplain {
72        Rw(PhysicalPlanRef),
73        #[cfg(feature = "datafusion")]
74        Df(DfBatchQueryPlanResult),
75    }
76
77    {
78        let (plan, table, context) = match stmt {
79            // `CREATE TABLE` takes the ownership of the `OptimizerContext` to avoid `Rc` across
80            // `await` point. We can only take the reference back from the `PlanRef` if it's
81            // successfully planned.
82            Statement::CreateTable {
83                name,
84                columns,
85                constraints,
86                format_encode,
87                source_watermarks,
88                append_only,
89                on_conflict,
90                with_version_columns,
91                cdc_table_info,
92                include_column_options,
93                wildcard_idx,
94                webhook_info,
95                ..
96            } => {
97                let format_encode = format_encode.map(|s| s.into_v2_with_warning());
98
99                let (plan, _source, table, _job_type, _) = handle_create_table_plan(
100                    handler_args,
101                    explain_options,
102                    format_encode,
103                    cdc_table_info,
104                    &name,
105                    columns,
106                    wildcard_idx,
107                    constraints,
108                    source_watermarks,
109                    append_only,
110                    on_conflict,
111                    with_version_columns
112                        .iter()
113                        .map(|col| col.real_value())
114                        .collect(),
115                    include_column_options,
116                    webhook_info,
117                    risingwave_common::catalog::Engine::Hummock,
118                )
119                .await?;
120                let context = plan.ctx();
121                (
122                    Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(plan))),
123                    Some(table),
124                    context,
125                )
126            }
127            Statement::CreateSink { stmt } => {
128                let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
129                    .await
130                    .map(|plan| plan.sink_plan)?;
131                let context = plan.ctx();
132                (
133                    Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(plan))),
134                    None,
135                    context,
136                )
137            }
138
139            Statement::FetchCursor {
140                stmt: FetchCursorStatement { cursor_name, .. },
141            } => {
142                let cursor_manager = session.clone().get_cursor_manager();
143                let plan = cursor_manager
144                    .gen_batch_plan_with_subscription_cursor(
145                        &cursor_name.real_value(),
146                        handler_args,
147                    )
148                    .await
149                    .map(|x| x.plan)?;
150                let context = plan.ctx();
151                (
152                    Ok(PlanToExplain::Rw(PhysicalPlanRef::Batch(plan))),
153                    None,
154                    context,
155                )
156            }
157
158            // For other queries without `await` point, we can keep a copy of reference to the
159            // `OptimizerContext` even if the planning fails. This enables us to log the partial
160            // traces for better debugging experience.
161            _ => {
162                let context: OptimizerContextRef =
163                    OptimizerContext::new(handler_args, explain_options).into();
164                let context_clone = context.clone();
165
166                let res = match stmt {
167                    // -- Streaming DDLs --
168                    Statement::CreateView {
169                        or_replace: false,
170                        materialized: true,
171                        query,
172                        name,
173                        columns,
174                        emit_mode,
175                        ..
176                    } => gen_create_mv_plan(&session, context, *query, name, columns, emit_mode)
177                        .map(|(plan, table)| {
178                            (
179                                PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)),
180                                Some(table),
181                            )
182                        }),
183                    Statement::CreateView {
184                        materialized: false,
185                        ..
186                    } => {
187                        return Err(ErrorCode::NotSupported(
188                            "EXPLAIN CREATE VIEW".into(),
189                            "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
190                        ).into());
191                    }
192
193                    Statement::CreateSubscription { .. } => {
194                        return Err(ErrorCode::NotSupported(
195                            "EXPLAIN CREATE SUBSCRIPTION".into(),
196                            "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
197                        ).into());
198                    }
199                    Statement::CreateIndex {
200                        name,
201                        table_name,
202                        method,
203                        columns,
204                        include,
205                        distributed_by,
206                        ..
207                    } => {
208                        let (schema_name, table, index_table_name) =
209                            resolve_index_schema(&session, name, table_name)?;
210                        gen_create_index_plan(
211                            &session,
212                            context,
213                            schema_name,
214                            table,
215                            index_table_name,
216                            method,
217                            columns,
218                            include,
219                            distributed_by,
220                        )
221                    }
222                    .map(|(plan, index_table, _index)| {
223                        (
224                            PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)),
225                            Some(index_table),
226                        )
227                    }),
228
229                    // -- Batch Queries --
230                    Statement::Insert { .. }
231                    | Statement::Delete { .. }
232                    | Statement::Update { .. }
233                    | Statement::Query { .. } => {
234                        match gen_batch_plan_by_statement(&session, context, stmt)? {
235                            BatchPlanChoice::Rw(plan_result) => Ok((
236                                PlanToExplain::Rw(PhysicalPlanRef::Batch(plan_result.plan)),
237                                None,
238                            )),
239                            #[cfg(feature = "datafusion")]
240                            BatchPlanChoice::Df(plan_result) => {
241                                Ok((PlanToExplain::Df(plan_result), None))
242                            }
243                        }
244                    }
245
246                    _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
247                };
248
249                match res {
250                    Ok((plan, table)) => (Ok(plan), table, context_clone),
251                    Err(e) => (Err(e), None, context_clone),
252                }
253            }
254        };
255
256        let explain_trace = explain_options.trace;
257        let explain_verbose = explain_options.verbose;
258        let explain_backfill = explain_options.backfill;
259        let explain_type = explain_options.explain_type;
260        let explain_format = explain_options.explain_format;
261
262        if explain_trace {
263            let trace = context.take_trace();
264            blocks.extend(trace);
265        }
266
267        match explain_type {
268            ExplainType::DistSql => {
269                if let Ok(plan) = &plan {
270                    match plan {
271                        PlanToExplain::Rw(PhysicalPlanRef::Batch(plan)) => {
272                            let worker_node_manager_reader = WorkerNodeSelector::new(
273                                session.env().worker_node_manager_ref(),
274                                session.is_barrier_read(),
275                            );
276                            batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
277                                worker_node_manager_reader,
278                                session.env().catalog_reader().clone(),
279                                session.config().batch_parallelism().0,
280                                plan.clone(),
281                            )?);
282                            batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
283                                ExplainFormat::Dot
284                            } else {
285                                ExplainFormat::Json
286                            }
287                        }
288                        PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)) => {
289                            let graph = build_graph(plan.clone(), None)?;
290                            let table = table.map(|x| x.to_prost());
291                            if explain_format == ExplainFormat::Dot {
292                                blocks.push(explain_stream_graph_as_dot(
293                                    &graph,
294                                    table,
295                                    explain_verbose,
296                                ))
297                            } else {
298                                blocks.push(explain_stream_graph(&graph, table, explain_verbose));
299                            }
300                        }
301                        #[cfg(feature = "datafusion")]
302                        PlanToExplain::Df(_) => {
303                            return Err(ErrorCode::NotSupported(
304                                "EXPLAIN DISTRIBUTED for DataFusion plan".into(),
305                                "Distributed explain is only available for RisingWave batch plans."
306                                    .into(),
307                            )
308                            .into());
309                        }
310                    }
311                }
312            }
313            ExplainType::Physical => {
314                // if explain trace is on, the plan has been in the rows
315                if !explain_trace && let Ok(physical_plan) = &plan {
316                    match physical_plan {
317                        PlanToExplain::Rw(physical_plan) => {
318                            let plan = match &physical_plan {
319                                PhysicalPlanRef::Stream(plan) => plan as &dyn Explain,
320                                PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
321                            };
322                            match explain_format {
323                                ExplainFormat::Text => {
324                                    blocks.push(plan.explain_to_string());
325                                }
326                                ExplainFormat::Json => blocks.push(plan.explain_to_json()),
327                                ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
328                                ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
329                                ExplainFormat::Dot => {
330                                    if explain_backfill
331                                        && let PhysicalPlanRef::Stream(plan) = physical_plan
332                                    {
333                                        let dot_formatted_backfill_order =
334                                            explain_backfill_order_in_dot_format(
335                                                &session,
336                                                context.with_options().backfill_order_strategy(),
337                                                plan.clone(),
338                                            )?;
339                                        blocks.push(dot_formatted_backfill_order);
340                                    } else {
341                                        blocks.push(plan.explain_to_dot());
342                                    }
343                                }
344                            }
345                        }
346                        #[cfg(feature = "datafusion")]
347                        PlanToExplain::Df(plan) => {
348                            datafusion_physical_plan_request =
349                                Some((plan.clone(), explain_format, explain_verbose));
350                        }
351                    }
352                }
353            }
354            ExplainType::Logical => {
355                // if explain trace is on, the plan has been in the rows
356                if !explain_trace {
357                    let output = context.take_logical().ok_or_else(|| {
358                        ErrorCode::InternalError("Logical plan not found for query".into())
359                    })?;
360                    blocks.push(output);
361                }
362            }
363        }
364
365        // Throw the error.
366        plan?;
367    }
368
369    // Use explain_datafusion_plan here to avoid `Rc` across `await` point.
370    #[cfg(feature = "datafusion")]
371    if let Some((plan, explain_format, explain_verbose)) = datafusion_physical_plan_request {
372        let df_ctx = create_datafusion_context(session.as_ref());
373        let physical_plan = build_datafusion_physical_plan(&df_ctx, &plan).await?;
374        explain_datafusion_plan(
375            physical_plan.as_ref(),
376            explain_format,
377            explain_verbose,
378            blocks,
379        )?;
380    }
381
382    if let Some(fragmenter) = batch_plan_fragmenter {
383        let query = fragmenter.generate_complete_query().await?;
384        let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
385            let graph = query.stage_graph.to_petgraph();
386            let dot = Dot::new(&graph);
387            dot.to_string()
388        } else {
389            serde_json::to_string_pretty(&query.stage_graph).unwrap()
390        };
391        blocks.push(stage_graph);
392    }
393
394    Ok(())
395}
396
397#[cfg(feature = "datafusion")]
398fn explain_datafusion_plan(
399    plan: &dyn ExecutionPlan,
400    explain_format: ExplainFormat,
401    verbose: bool,
402    blocks: &mut Vec<String>,
403) -> Result<()> {
404    let output = match explain_format {
405        ExplainFormat::Text => displayable(plan).indent(verbose).to_string(),
406        unsupported => {
407            return Err(ErrorCode::NotSupported(
408                format!("EXPLAIN ... {:?} for DataFusion plan", unsupported),
409                "Only TEXT format is supported for DataFusion plans.".to_owned(),
410            )
411            .into());
412        }
413    };
414    blocks.push(output);
415    Ok(())
416}
417
418pub async fn handle_explain(
419    handler_args: HandlerArgs,
420    stmt: Statement,
421    options: ExplainOptions,
422    analyze: bool,
423) -> Result<RwPgResponse> {
424    if analyze {
425        // NOTE(kwannoel): This path is for explain analyze on stream and batch queries.
426        // For existing stream jobs, see the handler module `explain_analyze` instead.
427        bail_not_implemented!(issue = 4856, "explain analyze");
428    }
429    if options.trace && options.explain_format == ExplainFormat::Json {
430        return Err(ErrorCode::NotSupported(
431            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
432            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
433        )
434        .into());
435    }
436    if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
437    {
438        return Err(ErrorCode::NotSupported(
439            "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
440            "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
441        )
442        .into());
443    }
444
445    let mut blocks = Vec::new();
446    let result = do_handle_explain(handler_args, options, stmt, &mut blocks).await;
447
448    if let Err(e) = result {
449        if options.trace {
450            // If `trace` is on, we include the error in the output with partial traces.
451            blocks.push(if options.verbose {
452                format!("ERROR: {:?}", e.as_report())
453            } else {
454                format!("ERROR: {}", e.as_report())
455            });
456        } else {
457            // Else, directly return the error.
458            return Err(e);
459        }
460    }
461
462    let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
463        query_plan: l.into(),
464    });
465
466    Ok(PgResponse::builder(StatementType::EXPLAIN)
467        .rows(rows)
468        .into())
469}
470
471#[derive(Fields)]
472#[fields(style = "TITLE CASE")]
473pub(crate) struct ExplainRow {
474    pub query_plan: String,
475}