risingwave_frontend/handler/
explain.rsuse pgwire::pg_response::{PgResponse, StatementType};
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::bail_not_implemented;
use risingwave_common::types::Fields;
use risingwave_sqlparser::ast::{
ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
};
use thiserror_ext::AsReport;
use super::create_index::{gen_create_index_plan, resolve_index_schema};
use super::create_mv::gen_create_mv_plan;
use super::create_sink::gen_sink_plan;
use super::query::gen_batch_plan_by_statement;
use super::util::SourceSchemaCompatExt;
use super::{RwPgResponse, RwPgResponseBuilderExt};
use crate::error::{ErrorCode, Result};
use crate::handler::create_table::handle_create_table_plan;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{Convention, Explain};
use crate::optimizer::OptimizerContext;
use crate::scheduler::BatchPlanFragmenter;
use crate::stream_fragmenter::build_graph;
use crate::utils::explain_stream_graph;
use crate::OptimizerContextRef;
async fn do_handle_explain(
handler_args: HandlerArgs,
explain_options: ExplainOptions,
stmt: Statement,
blocks: &mut Vec<String>,
) -> Result<()> {
let mut batch_plan_fragmenter = None;
let session = handler_args.session.clone();
{
let (plan, context) = match stmt {
Statement::CreateTable {
name,
columns,
constraints,
format_encode,
source_watermarks,
append_only,
on_conflict,
with_version_column,
cdc_table_info,
include_column_options,
wildcard_idx,
..
} => {
let format_encode = format_encode.map(|s| s.into_v2_with_warning());
let (plan, _source, _table, _job_type) = handle_create_table_plan(
handler_args,
explain_options,
format_encode,
cdc_table_info,
name.clone(),
columns,
wildcard_idx,
constraints,
source_watermarks,
append_only,
on_conflict,
with_version_column,
include_column_options,
)
.await?;
let context = plan.ctx();
(Ok(plan), context)
}
Statement::CreateSink { stmt } => {
let plan = gen_sink_plan(handler_args, stmt, Some(explain_options))
.await
.map(|plan| plan.sink_plan)?;
let context = plan.ctx();
(Ok(plan), context)
}
Statement::FetchCursor {
stmt: FetchCursorStatement { cursor_name, .. },
} => {
let cursor_manager = session.clone().get_cursor_manager();
let plan = cursor_manager
.gen_batch_plan_with_subscription_cursor(cursor_name, handler_args)
.await
.map(|x| x.plan)?;
let context = plan.ctx();
(Ok(plan), context)
}
_ => {
let context: OptimizerContextRef =
OptimizerContext::new(handler_args, explain_options).into();
let plan = match stmt {
Statement::CreateView {
or_replace: false,
materialized: true,
query,
name,
columns,
emit_mode,
..
} => gen_create_mv_plan(
&session,
context.clone(),
*query,
name,
columns,
emit_mode,
)
.map(|x| x.0),
Statement::CreateView {
materialized: false,
..
} => {
return Err(ErrorCode::NotSupported(
"EXPLAIN CREATE VIEW".into(),
"A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
).into());
}
Statement::CreateSubscription { .. } => {
return Err(ErrorCode::NotSupported(
"EXPLAIN CREATE SUBSCRIPTION".into(),
"A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
).into());
}
Statement::CreateIndex {
name,
table_name,
columns,
include,
distributed_by,
..
} => {
let (schema_name, table, index_table_name) =
resolve_index_schema(&session, name, table_name)?;
gen_create_index_plan(
&session,
context.clone(),
schema_name,
table,
index_table_name,
columns,
include,
distributed_by,
)
}
.map(|x| x.0),
Statement::Insert { .. }
| Statement::Delete { .. }
| Statement::Update { .. }
| Statement::Query { .. } => {
gen_batch_plan_by_statement(&session, context, stmt).map(|x| x.plan)
}
_ => bail_not_implemented!("unsupported statement {:?}", stmt),
};
let plan = plan?;
let context = plan.ctx().clone();
(Ok(plan) as Result<_>, context)
}
};
let explain_trace = context.is_explain_trace();
let explain_verbose = context.is_explain_verbose();
let explain_type = context.explain_type();
let explain_format = context.explain_format();
if explain_trace {
let trace = context.take_trace();
blocks.extend(trace);
}
match explain_type {
ExplainType::DistSql => {
if let Ok(plan) = &plan {
match plan.convention() {
Convention::Logical => unreachable!(),
Convention::Batch => {
let worker_node_manager_reader = WorkerNodeSelector::new(
session.env().worker_node_manager_ref(),
session.is_barrier_read(),
);
batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
worker_node_manager_reader,
session.env().catalog_reader().clone(),
session.config().batch_parallelism().0,
plan.clone(),
)?);
}
Convention::Stream => {
let graph = build_graph(plan.clone())?;
blocks.push(explain_stream_graph(&graph, explain_verbose));
}
}
}
}
ExplainType::Physical => {
if !explain_trace && let Ok(plan) = &plan {
match explain_format {
ExplainFormat::Text => blocks.push(plan.explain_to_string()),
ExplainFormat::Json => blocks.push(plan.explain_to_json()),
ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
ExplainFormat::Dot => blocks.push(plan.explain_to_dot()),
}
}
}
ExplainType::Logical => {
if !explain_trace {
let output = context.take_logical().ok_or_else(|| {
ErrorCode::InternalError("Logical plan not found for query".into())
})?;
blocks.push(output);
}
}
}
plan?;
}
if let Some(fragmenter) = batch_plan_fragmenter {
let query = fragmenter.generate_complete_query().await?;
let stage_graph_json = serde_json::to_string_pretty(&query.stage_graph).unwrap();
blocks.push(stage_graph_json);
}
Ok(())
}
pub async fn handle_explain(
handler_args: HandlerArgs,
stmt: Statement,
options: ExplainOptions,
analyze: bool,
) -> Result<RwPgResponse> {
if analyze {
bail_not_implemented!(issue = 4856, "explain analyze");
}
if options.trace && options.explain_format == ExplainFormat::Json {
return Err(ErrorCode::NotSupported(
"EXPLAIN (TRACE, JSON FORMAT)".to_string(),
"Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_string(),
)
.into());
}
if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
{
return Err(ErrorCode::NotSupported(
"EXPLAIN (TRACE, JSON FORMAT)".to_string(),
"Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_string(),
)
.into());
}
let mut blocks = Vec::new();
let result = do_handle_explain(handler_args, options.clone(), stmt, &mut blocks).await;
if let Err(e) = result {
if options.trace {
blocks.push(if options.verbose {
format!("ERROR: {:?}", e.as_report())
} else {
format!("ERROR: {}", e.as_report())
});
} else {
return Err(e);
}
}
let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
query_plan: l.into(),
});
Ok(PgResponse::builder(StatementType::EXPLAIN)
.rows(rows)
.into())
}
#[derive(Fields)]
#[fields(style = "TITLE CASE")]
struct ExplainRow {
query_plan: String,
}