1#[cfg(feature = "datafusion")]
16use datafusion::physical_plan::{ExecutionPlan, displayable};
17use petgraph::dot::Dot;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::types::Fields;
22use risingwave_sqlparser::ast::{
23 ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
24};
25use thiserror_ext::AsReport;
26
27use super::create_index::{gen_create_index_plan, resolve_index_schema};
28use super::create_mv::gen_create_mv_plan;
29use super::create_sink::gen_sink_plan;
30use super::query::{BatchPlanChoice, gen_batch_plan_by_statement};
31use super::util::SourceSchemaCompatExt;
32use super::{RwPgResponse, RwPgResponseBuilderExt};
33use crate::OptimizerContextRef;
34#[cfg(feature = "datafusion")]
35use crate::datafusion::{
36 DfBatchQueryPlanResult, build_datafusion_physical_plan, create_datafusion_context,
37};
38use crate::error::{ErrorCode, Result};
39use crate::handler::HandlerArgs;
40use crate::handler::create_table::handle_create_table_plan;
41use crate::optimizer::OptimizerContext;
42use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
43use crate::optimizer::plan_node::generic::GenericPlanRef;
44use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
45use crate::scheduler::BatchPlanFragmenter;
46use crate::stream_fragmenter::build_graph;
47use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
48
49pub async fn do_handle_explain(
50 handler_args: HandlerArgs,
51 explain_options: ExplainOptions,
52 stmt: Statement,
53 blocks: &mut Vec<String>,
54) -> Result<()> {
55 let mut batch_plan_fragmenter = None;
57 let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
58 #[cfg(feature = "datafusion")]
59 let mut datafusion_physical_plan_request: Option<(
60 DfBatchQueryPlanResult,
61 ExplainFormat,
62 bool,
63 )> = None;
64
65 let session = handler_args.session.clone();
66
67 enum PhysicalPlanRef {
68 Stream(StreamPlanRef),
69 Batch(BatchPlanRef),
70 }
71 enum PlanToExplain {
72 Rw(PhysicalPlanRef),
73 #[cfg(feature = "datafusion")]
74 Df(DfBatchQueryPlanResult),
75 }
76
77 {
78 let (plan, table, context) = match stmt {
79 Statement::CreateTable {
83 name,
84 columns,
85 constraints,
86 format_encode,
87 source_watermarks,
88 append_only,
89 on_conflict,
90 with_version_columns,
91 cdc_table_info,
92 include_column_options,
93 wildcard_idx,
94 webhook_info,
95 ..
96 } => {
97 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
98
99 let (plan, _source, table, _job_type, _) = handle_create_table_plan(
100 handler_args,
101 explain_options,
102 format_encode,
103 cdc_table_info,
104 &name,
105 columns,
106 wildcard_idx,
107 constraints,
108 source_watermarks,
109 append_only,
110 on_conflict,
111 with_version_columns
112 .iter()
113 .map(|col| col.real_value())
114 .collect(),
115 include_column_options,
116 webhook_info,
117 risingwave_common::catalog::Engine::Hummock,
118 )
119 .await?;
120 let context = plan.ctx();
121 (
122 Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(plan))),
123 Some(table),
124 context,
125 )
126 }
127 Statement::CreateSink { stmt } => {
128 let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
129 .await
130 .map(|plan| plan.sink_plan)?;
131 let context = plan.ctx();
132 (
133 Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(plan))),
134 None,
135 context,
136 )
137 }
138
139 Statement::FetchCursor {
140 stmt: FetchCursorStatement { cursor_name, .. },
141 } => {
142 let cursor_manager = session.clone().get_cursor_manager();
143 let plan = cursor_manager
144 .gen_batch_plan_with_subscription_cursor(
145 &cursor_name.real_value(),
146 handler_args,
147 )
148 .await
149 .map(|x| x.plan)?;
150 let context = plan.ctx();
151 (
152 Ok(PlanToExplain::Rw(PhysicalPlanRef::Batch(plan))),
153 None,
154 context,
155 )
156 }
157
158 _ => {
162 let context: OptimizerContextRef =
163 OptimizerContext::new(handler_args, explain_options).into();
164 let context_clone = context.clone();
165
166 let res = match stmt {
167 Statement::CreateView {
169 or_replace: false,
170 materialized: true,
171 query,
172 name,
173 columns,
174 emit_mode,
175 ..
176 } => gen_create_mv_plan(&session, context, *query, name, columns, emit_mode)
177 .map(|(plan, table)| {
178 (
179 PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)),
180 Some(table),
181 )
182 }),
183 Statement::CreateView {
184 materialized: false,
185 ..
186 } => {
187 return Err(ErrorCode::NotSupported(
188 "EXPLAIN CREATE VIEW".into(),
189 "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
190 ).into());
191 }
192
193 Statement::CreateSubscription { .. } => {
194 return Err(ErrorCode::NotSupported(
195 "EXPLAIN CREATE SUBSCRIPTION".into(),
196 "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
197 ).into());
198 }
199 Statement::CreateIndex {
200 name,
201 table_name,
202 method,
203 columns,
204 include,
205 distributed_by,
206 ..
207 } => {
208 let (schema_name, table, index_table_name) =
209 resolve_index_schema(&session, name, table_name)?;
210 gen_create_index_plan(
211 &session,
212 context,
213 schema_name,
214 table,
215 index_table_name,
216 method,
217 columns,
218 include,
219 distributed_by,
220 )
221 }
222 .map(|(plan, index_table, _index)| {
223 (
224 PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)),
225 Some(index_table),
226 )
227 }),
228
229 Statement::Insert { .. }
231 | Statement::Delete { .. }
232 | Statement::Update { .. }
233 | Statement::Query { .. } => {
234 match gen_batch_plan_by_statement(&session, context, stmt)? {
235 BatchPlanChoice::Rw(plan_result) => Ok((
236 PlanToExplain::Rw(PhysicalPlanRef::Batch(plan_result.plan)),
237 None,
238 )),
239 #[cfg(feature = "datafusion")]
240 BatchPlanChoice::Df(plan_result) => {
241 Ok((PlanToExplain::Df(plan_result), None))
242 }
243 }
244 }
245
246 _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
247 };
248
249 match res {
250 Ok((plan, table)) => (Ok(plan), table, context_clone),
251 Err(e) => (Err(e), None, context_clone),
252 }
253 }
254 };
255
256 let explain_trace = explain_options.trace;
257 let explain_verbose = explain_options.verbose;
258 let explain_backfill = explain_options.backfill;
259 let explain_type = explain_options.explain_type;
260 let explain_format = explain_options.explain_format;
261
262 if explain_trace {
263 let trace = context.take_trace();
264 blocks.extend(trace);
265 }
266
267 match explain_type {
268 ExplainType::DistSql => {
269 if let Ok(plan) = &plan {
270 match plan {
271 PlanToExplain::Rw(PhysicalPlanRef::Batch(plan)) => {
272 let worker_node_manager_reader = WorkerNodeSelector::new(
273 session.env().worker_node_manager_ref(),
274 session.is_barrier_read(),
275 );
276 batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
277 worker_node_manager_reader,
278 session.env().catalog_reader().clone(),
279 session.config().batch_parallelism().0,
280 plan.clone(),
281 )?);
282 batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
283 ExplainFormat::Dot
284 } else {
285 ExplainFormat::Json
286 }
287 }
288 PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)) => {
289 let graph = build_graph(plan.clone(), None)?;
290 let table = table.map(|x| x.to_prost());
291 if explain_format == ExplainFormat::Dot {
292 blocks.push(explain_stream_graph_as_dot(
293 &graph,
294 table,
295 explain_verbose,
296 ))
297 } else {
298 blocks.push(explain_stream_graph(&graph, table, explain_verbose));
299 }
300 }
301 #[cfg(feature = "datafusion")]
302 PlanToExplain::Df(_) => {
303 return Err(ErrorCode::NotSupported(
304 "EXPLAIN DISTRIBUTED for DataFusion plan".into(),
305 "Distributed explain is only available for RisingWave batch plans."
306 .into(),
307 )
308 .into());
309 }
310 }
311 }
312 }
313 ExplainType::Physical => {
314 if !explain_trace && let Ok(physical_plan) = &plan {
316 match physical_plan {
317 PlanToExplain::Rw(physical_plan) => {
318 let plan = match &physical_plan {
319 PhysicalPlanRef::Stream(plan) => plan as &dyn Explain,
320 PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
321 };
322 match explain_format {
323 ExplainFormat::Text => {
324 blocks.push(plan.explain_to_string());
325 }
326 ExplainFormat::Json => blocks.push(plan.explain_to_json()),
327 ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
328 ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
329 ExplainFormat::Dot => {
330 if explain_backfill
331 && let PhysicalPlanRef::Stream(plan) = physical_plan
332 {
333 let dot_formatted_backfill_order =
334 explain_backfill_order_in_dot_format(
335 &session,
336 context.with_options().backfill_order_strategy(),
337 plan.clone(),
338 )?;
339 blocks.push(dot_formatted_backfill_order);
340 } else {
341 blocks.push(plan.explain_to_dot());
342 }
343 }
344 }
345 }
346 #[cfg(feature = "datafusion")]
347 PlanToExplain::Df(plan) => {
348 datafusion_physical_plan_request =
349 Some((plan.clone(), explain_format, explain_verbose));
350 }
351 }
352 }
353 }
354 ExplainType::Logical => {
355 if !explain_trace {
357 let output = context.take_logical().ok_or_else(|| {
358 ErrorCode::InternalError("Logical plan not found for query".into())
359 })?;
360 blocks.push(output);
361 }
362 }
363 }
364
365 plan?;
367 }
368
369 #[cfg(feature = "datafusion")]
371 if let Some((plan, explain_format, explain_verbose)) = datafusion_physical_plan_request {
372 let df_ctx = create_datafusion_context(session.as_ref());
373 let physical_plan = build_datafusion_physical_plan(&df_ctx, &plan).await?;
374 explain_datafusion_plan(
375 physical_plan.as_ref(),
376 explain_format,
377 explain_verbose,
378 blocks,
379 )?;
380 }
381
382 if let Some(fragmenter) = batch_plan_fragmenter {
383 let query = fragmenter.generate_complete_query().await?;
384 let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
385 let graph = query.stage_graph.to_petgraph();
386 let dot = Dot::new(&graph);
387 dot.to_string()
388 } else {
389 serde_json::to_string_pretty(&query.stage_graph).unwrap()
390 };
391 blocks.push(stage_graph);
392 }
393
394 Ok(())
395}
396
397#[cfg(feature = "datafusion")]
398fn explain_datafusion_plan(
399 plan: &dyn ExecutionPlan,
400 explain_format: ExplainFormat,
401 verbose: bool,
402 blocks: &mut Vec<String>,
403) -> Result<()> {
404 let output = match explain_format {
405 ExplainFormat::Text => displayable(plan).indent(verbose).to_string(),
406 unsupported => {
407 return Err(ErrorCode::NotSupported(
408 format!("EXPLAIN ... {:?} for DataFusion plan", unsupported),
409 "Only TEXT format is supported for DataFusion plans.".to_owned(),
410 )
411 .into());
412 }
413 };
414 blocks.push(output);
415 Ok(())
416}
417
418pub async fn handle_explain(
419 handler_args: HandlerArgs,
420 stmt: Statement,
421 options: ExplainOptions,
422 analyze: bool,
423) -> Result<RwPgResponse> {
424 if analyze {
425 bail_not_implemented!(issue = 4856, "explain analyze");
428 }
429 if options.trace && options.explain_format == ExplainFormat::Json {
430 return Err(ErrorCode::NotSupported(
431 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
432 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
433 )
434 .into());
435 }
436 if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
437 {
438 return Err(ErrorCode::NotSupported(
439 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
440 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
441 )
442 .into());
443 }
444
445 let mut blocks = Vec::new();
446 let result = do_handle_explain(handler_args, options, stmt, &mut blocks).await;
447
448 if let Err(e) = result {
449 if options.trace {
450 blocks.push(if options.verbose {
452 format!("ERROR: {:?}", e.as_report())
453 } else {
454 format!("ERROR: {}", e.as_report())
455 });
456 } else {
457 return Err(e);
459 }
460 }
461
462 let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
463 query_plan: l.into(),
464 });
465
466 Ok(PgResponse::builder(StatementType::EXPLAIN)
467 .rows(rows)
468 .into())
469}
470
471#[derive(Fields)]
472#[fields(style = "TITLE CASE")]
473pub(crate) struct ExplainRow {
474 pub query_plan: String,
475}