risingwave_frontend/handler/
explain.rs1use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21 ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{Convention, Explain};
39use crate::scheduler::BatchPlanFragmenter;
40use crate::stream_fragmenter::build_graph;
41use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
42
43pub async fn do_handle_explain(
44 handler_args: HandlerArgs,
45 explain_options: ExplainOptions,
46 stmt: Statement,
47 blocks: &mut Vec<String>,
48) -> Result<()> {
49 let mut batch_plan_fragmenter = None;
51 let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
52
53 let session = handler_args.session.clone();
54
55 {
56 let (plan, table, context) = match stmt {
57 Statement::CreateTable {
61 name,
62 columns,
63 constraints,
64 format_encode,
65 source_watermarks,
66 append_only,
67 on_conflict,
68 with_version_column,
69 cdc_table_info,
70 include_column_options,
71 wildcard_idx,
72 webhook_info,
73 ..
74 } => {
75 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
76
77 let (plan, _source, table, _job_type, _) = handle_create_table_plan(
78 handler_args,
79 explain_options,
80 format_encode,
81 cdc_table_info,
82 name.clone(),
83 columns,
84 wildcard_idx,
85 constraints,
86 source_watermarks,
87 append_only,
88 on_conflict,
89 with_version_column.map(|x| x.real_value()),
90 include_column_options,
91 webhook_info,
92 risingwave_common::catalog::Engine::Hummock,
93 )
94 .await?;
95 let context = plan.ctx();
96 (Ok(plan), Some(table), context)
97 }
98 Statement::CreateSink { stmt } => {
99 let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
100 .await
101 .map(|plan| plan.sink_plan)?;
102 let context = plan.ctx();
103 (Ok(plan), None, context)
104 }
105
106 Statement::FetchCursor {
107 stmt: FetchCursorStatement { cursor_name, .. },
108 } => {
109 let cursor_manager = session.clone().get_cursor_manager();
110 let plan = cursor_manager
111 .gen_batch_plan_with_subscription_cursor(
112 &cursor_name.real_value(),
113 handler_args,
114 )
115 .await
116 .map(|x| x.plan)?;
117 let context = plan.ctx();
118 (Ok(plan), None, context)
119 }
120
121 _ => {
125 let context: OptimizerContextRef =
126 OptimizerContext::new(handler_args, explain_options).into();
127 let (plan, table) = match stmt {
128 Statement::CreateView {
130 or_replace: false,
131 materialized: true,
132 query,
133 name,
134 columns,
135 emit_mode,
136 ..
137 } => gen_create_mv_plan(
138 &session,
139 context.clone(),
140 *query,
141 name,
142 columns,
143 emit_mode,
144 )
145 .map(|(plan, table)| (plan, Some(table))),
146 Statement::CreateView {
147 materialized: false,
148 ..
149 } => {
150 return Err(ErrorCode::NotSupported(
151 "EXPLAIN CREATE VIEW".into(),
152 "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
153 ).into());
154 }
155
156 Statement::CreateSubscription { .. } => {
157 return Err(ErrorCode::NotSupported(
158 "EXPLAIN CREATE SUBSCRIPTION".into(),
159 "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
160 ).into());
161 }
162 Statement::CreateIndex {
163 name,
164 table_name,
165 columns,
166 include,
167 distributed_by,
168 ..
169 } => {
170 let (schema_name, table, index_table_name) =
171 resolve_index_schema(&session, name, table_name)?;
172 gen_create_index_plan(
173 &session,
174 context.clone(),
175 schema_name,
176 table,
177 index_table_name,
178 columns,
179 include,
180 distributed_by,
181 )
182 }
183 .map(|(plan, index_table, _index)| (plan, Some(index_table))),
184
185 Statement::Insert { .. }
187 | Statement::Delete { .. }
188 | Statement::Update { .. }
189 | Statement::Query { .. } => {
190 gen_batch_plan_by_statement(&session, context, stmt).map(|x| (x.plan, None))
191 }
192
193 _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
194 }?;
195
196 let context = plan.ctx().clone();
197
198 (Ok(plan) as Result<_>, table, context)
199 }
200 };
201
202 let explain_trace = context.is_explain_trace();
203 let explain_verbose = context.is_explain_verbose();
204 let explain_backfill = context.is_explain_backfill();
205 let explain_type = context.explain_type();
206 let explain_format = context.explain_format();
207
208 if explain_trace {
209 let trace = context.take_trace();
210 blocks.extend(trace);
211 }
212
213 match explain_type {
214 ExplainType::DistSql => {
215 if let Ok(plan) = &plan {
216 match plan.convention() {
217 Convention::Logical => unreachable!(),
218 Convention::Batch => {
219 let worker_node_manager_reader = WorkerNodeSelector::new(
220 session.env().worker_node_manager_ref(),
221 session.is_barrier_read(),
222 );
223 batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
224 worker_node_manager_reader,
225 session.env().catalog_reader().clone(),
226 session.config().batch_parallelism().0,
227 session.config().timezone().to_owned(),
228 plan.clone(),
229 )?);
230 batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
231 ExplainFormat::Dot
232 } else {
233 ExplainFormat::Json
234 }
235 }
236 Convention::Stream => {
237 let graph = build_graph(plan.clone(), None)?;
238 let table = table.map(|x| x.to_prost());
239 if explain_format == ExplainFormat::Dot {
240 blocks.push(explain_stream_graph_as_dot(
241 &graph,
242 table,
243 explain_verbose,
244 ))
245 } else {
246 blocks.push(explain_stream_graph(&graph, table, explain_verbose));
247 }
248 }
249 }
250 }
251 }
252 ExplainType::Physical => {
253 if !explain_trace && let Ok(plan) = &plan {
255 match explain_format {
256 ExplainFormat::Text => {
257 blocks.push(plan.explain_to_string());
258 }
259 ExplainFormat::Json => blocks.push(plan.explain_to_json()),
260 ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
261 ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
262 ExplainFormat::Dot => {
263 if explain_backfill {
264 let dot_formatted_backfill_order =
265 explain_backfill_order_in_dot_format(
266 &session,
267 context.with_options().backfill_order_strategy(),
268 plan.clone(),
269 )?;
270 blocks.push(dot_formatted_backfill_order);
271 } else {
272 blocks.push(plan.explain_to_dot());
273 }
274 }
275 }
276 }
277 }
278 ExplainType::Logical => {
279 if !explain_trace {
281 let output = context.take_logical().ok_or_else(|| {
282 ErrorCode::InternalError("Logical plan not found for query".into())
283 })?;
284 blocks.push(output);
285 }
286 }
287 }
288
289 plan?;
291 }
292
293 if let Some(fragmenter) = batch_plan_fragmenter {
294 let query = fragmenter.generate_complete_query().await?;
295 let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
296 let graph = query.stage_graph.to_petgraph();
297 let dot = Dot::new(&graph);
298 dot.to_string()
299 } else {
300 serde_json::to_string_pretty(&query.stage_graph).unwrap()
301 };
302 blocks.push(stage_graph);
303 }
304
305 Ok(())
306}
307
308pub async fn handle_explain(
309 handler_args: HandlerArgs,
310 stmt: Statement,
311 options: ExplainOptions,
312 analyze: bool,
313) -> Result<RwPgResponse> {
314 if analyze {
315 bail_not_implemented!(issue = 4856, "explain analyze");
318 }
319 if options.trace && options.explain_format == ExplainFormat::Json {
320 return Err(ErrorCode::NotSupported(
321 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
322 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
323 )
324 .into());
325 }
326 if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
327 {
328 return Err(ErrorCode::NotSupported(
329 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
330 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
331 )
332 .into());
333 }
334
335 let mut blocks = Vec::new();
336 let result = do_handle_explain(handler_args, options.clone(), stmt, &mut blocks).await;
337
338 if let Err(e) = result {
339 if options.trace {
340 blocks.push(if options.verbose {
342 format!("ERROR: {:?}", e.as_report())
343 } else {
344 format!("ERROR: {}", e.as_report())
345 });
346 } else {
347 return Err(e);
349 }
350 }
351
352 let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
353 query_plan: l.into(),
354 });
355
356 Ok(PgResponse::builder(StatementType::EXPLAIN)
357 .rows(rows)
358 .into())
359}
360
361#[derive(Fields)]
362#[fields(style = "TITLE CASE")]
363pub(crate) struct ExplainRow {
364 pub query_plan: String,
365}