risingwave_frontend/handler/
explain.rs1use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21 ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::plan_node::generic::GenericPlanRef;
37use crate::optimizer::plan_node::{Convention, Explain};
38use crate::scheduler::BatchPlanFragmenter;
39use crate::stream_fragmenter::build_graph;
40use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
41
42async fn do_handle_explain(
43 handler_args: HandlerArgs,
44 explain_options: ExplainOptions,
45 stmt: Statement,
46 blocks: &mut Vec<String>,
47) -> Result<()> {
48 let mut batch_plan_fragmenter = None;
50 let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
51
52 let session = handler_args.session.clone();
53
54 {
55 let (plan, context) = match stmt {
56 Statement::CreateTable {
60 name,
61 columns,
62 constraints,
63 format_encode,
64 source_watermarks,
65 append_only,
66 on_conflict,
67 with_version_column,
68 cdc_table_info,
69 include_column_options,
70 wildcard_idx,
71 webhook_info,
72 ..
73 } => {
74 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
75
76 let (plan, _source, _table, _job_type) = handle_create_table_plan(
77 handler_args,
78 explain_options,
79 format_encode,
80 cdc_table_info,
81 name.clone(),
82 columns,
83 wildcard_idx,
84 constraints,
85 source_watermarks,
86 append_only,
87 on_conflict,
88 with_version_column.map(|x| x.real_value()),
89 include_column_options,
90 webhook_info,
91 risingwave_common::catalog::Engine::Hummock,
92 )
93 .await?;
94 let context = plan.ctx();
95 (Ok(plan), context)
96 }
97 Statement::CreateSink { stmt } => {
98 let plan = gen_sink_plan(handler_args, stmt, Some(explain_options))
99 .await
100 .map(|plan| plan.sink_plan)?;
101 let context = plan.ctx();
102 (Ok(plan), context)
103 }
104
105 Statement::FetchCursor {
106 stmt: FetchCursorStatement { cursor_name, .. },
107 } => {
108 let cursor_manager = session.clone().get_cursor_manager();
109 let plan = cursor_manager
110 .gen_batch_plan_with_subscription_cursor(
111 &cursor_name.real_value(),
112 handler_args,
113 )
114 .await
115 .map(|x| x.plan)?;
116 let context = plan.ctx();
117 (Ok(plan), context)
118 }
119
120 _ => {
124 let context: OptimizerContextRef =
125 OptimizerContext::new(handler_args, explain_options).into();
126 let plan = match stmt {
127 Statement::CreateView {
129 or_replace: false,
130 materialized: true,
131 query,
132 name,
133 columns,
134 emit_mode,
135 ..
136 } => gen_create_mv_plan(
137 &session,
138 context.clone(),
139 *query,
140 name,
141 columns,
142 emit_mode,
143 )
144 .map(|x| x.0),
145 Statement::CreateView {
146 materialized: false,
147 ..
148 } => {
149 return Err(ErrorCode::NotSupported(
150 "EXPLAIN CREATE VIEW".into(),
151 "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
152 ).into());
153 }
154
155 Statement::CreateSubscription { .. } => {
156 return Err(ErrorCode::NotSupported(
157 "EXPLAIN CREATE SUBSCRIPTION".into(),
158 "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
159 ).into());
160 }
161 Statement::CreateIndex {
162 name,
163 table_name,
164 columns,
165 include,
166 distributed_by,
167 ..
168 } => {
169 let (schema_name, table, index_table_name) =
170 resolve_index_schema(&session, name, table_name)?;
171 gen_create_index_plan(
172 &session,
173 context.clone(),
174 schema_name,
175 table,
176 index_table_name,
177 columns,
178 include,
179 distributed_by,
180 )
181 }
182 .map(|x| x.0),
183
184 Statement::Insert { .. }
186 | Statement::Delete { .. }
187 | Statement::Update { .. }
188 | Statement::Query { .. } => {
189 gen_batch_plan_by_statement(&session, context, stmt).map(|x| x.plan)
190 }
191
192 _ => bail_not_implemented!("unsupported statement {:?}", stmt),
193 };
194
195 let plan = plan?;
196 let context = plan.ctx().clone();
197
198 (Ok(plan) as Result<_>, context)
199 }
200 };
201
202 let explain_trace = context.is_explain_trace();
203 let explain_verbose = context.is_explain_verbose();
204 let explain_type = context.explain_type();
205 let explain_format = context.explain_format();
206
207 if explain_trace {
208 let trace = context.take_trace();
209 blocks.extend(trace);
210 }
211
212 match explain_type {
213 ExplainType::DistSql => {
214 if let Ok(plan) = &plan {
215 match plan.convention() {
216 Convention::Logical => unreachable!(),
217 Convention::Batch => {
218 let worker_node_manager_reader = WorkerNodeSelector::new(
219 session.env().worker_node_manager_ref(),
220 session.is_barrier_read(),
221 );
222 batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
223 worker_node_manager_reader,
224 session.env().catalog_reader().clone(),
225 session.config().batch_parallelism().0,
226 session.config().timezone().to_owned(),
227 plan.clone(),
228 )?);
229 batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
230 ExplainFormat::Dot
231 } else {
232 ExplainFormat::Json
233 }
234 }
235 Convention::Stream => {
236 let graph = build_graph(plan.clone())?;
237 if explain_format == ExplainFormat::Dot {
238 blocks.push(explain_stream_graph_as_dot(&graph, explain_verbose))
239 } else {
240 blocks.push(explain_stream_graph(&graph, explain_verbose));
241 }
242 }
243 }
244 }
245 }
246 ExplainType::Physical => {
247 if !explain_trace && let Ok(plan) = &plan {
249 match explain_format {
250 ExplainFormat::Text => blocks.push(plan.explain_to_string()),
251 ExplainFormat::Json => blocks.push(plan.explain_to_json()),
252 ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
253 ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
254 ExplainFormat::Dot => blocks.push(plan.explain_to_dot()),
255 }
256 }
257 }
258 ExplainType::Logical => {
259 if !explain_trace {
261 let output = context.take_logical().ok_or_else(|| {
262 ErrorCode::InternalError("Logical plan not found for query".into())
263 })?;
264 blocks.push(output);
265 }
266 }
267 }
268
269 plan?;
271 }
272
273 if let Some(fragmenter) = batch_plan_fragmenter {
274 let query = fragmenter.generate_complete_query().await?;
275 let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
276 let graph = query.stage_graph.to_petgraph();
277 let dot = Dot::new(&graph);
278 dot.to_string()
279 } else {
280 serde_json::to_string_pretty(&query.stage_graph).unwrap()
281 };
282 blocks.push(stage_graph);
283 }
284
285 Ok(())
286}
287
288pub async fn handle_explain(
289 handler_args: HandlerArgs,
290 stmt: Statement,
291 options: ExplainOptions,
292 analyze: bool,
293) -> Result<RwPgResponse> {
294 if analyze {
295 bail_not_implemented!(issue = 4856, "explain analyze");
298 }
299 if options.trace && options.explain_format == ExplainFormat::Json {
300 return Err(ErrorCode::NotSupported(
301 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
302 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
303 )
304 .into());
305 }
306 if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
307 {
308 return Err(ErrorCode::NotSupported(
309 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
310 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
311 )
312 .into());
313 }
314
315 let mut blocks = Vec::new();
316 let result = do_handle_explain(handler_args, options.clone(), stmt, &mut blocks).await;
317
318 if let Err(e) = result {
319 if options.trace {
320 blocks.push(if options.verbose {
322 format!("ERROR: {:?}", e.as_report())
323 } else {
324 format!("ERROR: {}", e.as_report())
325 });
326 } else {
327 return Err(e);
329 }
330 }
331
332 let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
333 query_plan: l.into(),
334 });
335
336 Ok(PgResponse::builder(StatementType::EXPLAIN)
337 .rows(rows)
338 .into())
339}
340
341#[derive(Fields)]
342#[fields(style = "TITLE CASE")]
343struct ExplainRow {
344 query_plan: String,
345}