1#[cfg(feature = "datafusion")]
16use datafusion::physical_plan::{ExecutionPlan, displayable};
17use petgraph::dot::Dot;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::types::Fields;
22use risingwave_sqlparser::ast::{
23 ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
24};
25use thiserror_ext::AsReport;
26
27use super::create_index::{gen_create_index_plan, resolve_index_schema};
28use super::create_mv::explain_create_mv_plan;
29use super::create_sink::gen_sink_plan;
30use super::query::{BatchPlanChoice, gen_batch_plan_by_statement};
31use super::util::SourceSchemaCompatExt;
32use super::{RwPgResponse, RwPgResponseBuilderExt};
33use crate::OptimizerContextRef;
34#[cfg(feature = "datafusion")]
35use crate::datafusion::{
36 DfBatchQueryPlanResult, build_datafusion_physical_plan, create_datafusion_context,
37};
38use crate::error::{ErrorCode, Result};
39use crate::handler::HandlerArgs;
40use crate::handler::create_table::handle_create_table_plan;
41use crate::optimizer::OptimizerContext;
42use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
43use crate::optimizer::plan_node::generic::GenericPlanRef;
44use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
45use crate::scheduler::BatchPlanFragmenter;
46use crate::stream_fragmenter::{GraphJobType, build_graph};
47use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
48
49pub async fn do_handle_explain(
50 handler_args: HandlerArgs,
51 explain_options: ExplainOptions,
52 stmt: Statement,
53 blocks: &mut Vec<String>,
54) -> Result<()> {
55 let mut batch_plan_fragmenter = None;
57 let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
58 #[cfg(feature = "datafusion")]
59 let mut datafusion_physical_plan_request: Option<(
60 DfBatchQueryPlanResult,
61 ExplainFormat,
62 bool,
63 )> = None;
64
65 let session = handler_args.session.clone();
66
67 enum PhysicalPlanRef {
68 Stream(StreamPlanRef, Option<GraphJobType>),
69 Batch(BatchPlanRef),
70 }
71 enum PlanToExplain {
72 Rw(PhysicalPlanRef),
73 #[cfg(feature = "datafusion")]
74 Df(DfBatchQueryPlanResult),
75 }
76
77 {
78 let (plan, table, context) = match stmt {
79 Statement::CreateTable {
83 name,
84 columns,
85 constraints,
86 format_encode,
87 source_watermarks,
88 append_only,
89 on_conflict,
90 with_version_columns,
91 cdc_table_info,
92 include_column_options,
93 wildcard_idx,
94 webhook_info,
95 ..
96 } => {
97 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
98
99 let (plan, _source, table, _job_type, _) = handle_create_table_plan(
100 handler_args,
101 explain_options,
102 format_encode,
103 cdc_table_info,
104 &name,
105 columns,
106 wildcard_idx,
107 constraints,
108 source_watermarks,
109 append_only,
110 on_conflict,
111 with_version_columns
112 .iter()
113 .map(|col| col.real_value())
114 .collect(),
115 include_column_options,
116 webhook_info,
117 risingwave_common::catalog::Engine::Hummock,
118 )
119 .await?;
120 let context = plan.ctx();
121 (
122 Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(
123 plan,
124 Some(GraphJobType::Table),
125 ))),
126 Some(table),
127 context,
128 )
129 }
130 Statement::CreateSink { stmt } => {
131 let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
132 .await
133 .map(|plan| plan.sink_plan)?;
134 let context = plan.ctx();
135 (
136 Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(
137 plan,
138 Some(GraphJobType::Sink),
139 ))),
140 None,
141 context,
142 )
143 }
144
145 Statement::FetchCursor {
146 stmt: FetchCursorStatement { cursor_name, .. },
147 } => {
148 let cursor_manager = session.clone().get_cursor_manager();
149 let plan = cursor_manager
150 .gen_batch_plan_with_subscription_cursor(
151 &cursor_name.real_value(),
152 handler_args,
153 )
154 .await
155 .map(|x| x.plan)?;
156 let context = plan.ctx();
157 (
158 Ok(PlanToExplain::Rw(PhysicalPlanRef::Batch(plan))),
159 None,
160 context,
161 )
162 }
163
164 _ => {
168 let context: OptimizerContextRef =
169 OptimizerContext::new(handler_args, explain_options).into();
170 let context_clone = context.clone();
171
172 let res = match stmt {
173 Statement::CreateView {
175 or_replace: false,
176 materialized: true,
177 query,
178 name,
179 columns,
180 emit_mode,
181 ..
182 } => {
183 explain_create_mv_plan(&session, context, *query, name, columns, emit_mode)
184 .map(|(plan, table)| {
185 (
186 PlanToExplain::Rw(PhysicalPlanRef::Stream(
187 plan,
188 Some(GraphJobType::MaterializedView),
189 )),
190 Some(table),
191 )
192 })
193 }
194 Statement::CreateView {
195 materialized: false,
196 ..
197 } => {
198 return Err(ErrorCode::NotSupported(
199 "EXPLAIN CREATE VIEW".into(),
200 "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
201 ).into());
202 }
203
204 Statement::CreateSubscription { .. } => {
205 return Err(ErrorCode::NotSupported(
206 "EXPLAIN CREATE SUBSCRIPTION".into(),
207 "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
208 ).into());
209 }
210 Statement::CreateIndex {
211 name,
212 table_name,
213 method,
214 columns,
215 include,
216 distributed_by,
217 ..
218 } => {
219 let (schema_name, table, index_table_name) =
220 resolve_index_schema(&session, name, table_name)?;
221 gen_create_index_plan(
222 &session,
223 context,
224 schema_name,
225 table,
226 index_table_name,
227 method,
228 columns,
229 include,
230 distributed_by,
231 )
232 }
233 .map(|(plan, index_table, _index)| {
234 (
235 PlanToExplain::Rw(PhysicalPlanRef::Stream(
236 plan,
237 Some(GraphJobType::Index),
238 )),
239 Some(index_table),
240 )
241 }),
242
243 Statement::Insert { .. }
245 | Statement::Delete { .. }
246 | Statement::Update { .. }
247 | Statement::Query { .. } => {
248 match gen_batch_plan_by_statement(&session, context, stmt)? {
249 BatchPlanChoice::Rw(plan_result) => Ok((
250 PlanToExplain::Rw(PhysicalPlanRef::Batch(plan_result.plan)),
251 None,
252 )),
253 #[cfg(feature = "datafusion")]
254 BatchPlanChoice::Df(plan_result) => {
255 Ok((PlanToExplain::Df(plan_result), None))
256 }
257 }
258 }
259
260 _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
261 };
262
263 match res {
264 Ok((plan, table)) => (Ok(plan), table, context_clone),
265 Err(e) => (Err(e), None, context_clone),
266 }
267 }
268 };
269
270 let explain_trace = explain_options.trace;
271 let explain_verbose = explain_options.verbose;
272 let explain_backfill = explain_options.backfill;
273 let explain_type = explain_options.explain_type;
274 let explain_format = explain_options.explain_format;
275
276 if explain_trace {
277 let trace = context.take_trace();
278 blocks.extend(trace);
279 }
280
281 match explain_type {
282 ExplainType::DistSql => {
283 if let Ok(plan) = &plan {
284 match plan {
285 PlanToExplain::Rw(PhysicalPlanRef::Batch(plan)) => {
286 let worker_node_manager_reader = WorkerNodeSelector::new(
287 session.env().worker_node_manager_ref(),
288 session.is_barrier_read(),
289 );
290 batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
291 worker_node_manager_reader,
292 session.env().catalog_reader().clone(),
293 session.config().batch_parallelism().0,
294 plan.clone(),
295 )?);
296 batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
297 ExplainFormat::Dot
298 } else {
299 ExplainFormat::Json
300 }
301 }
302 PlanToExplain::Rw(PhysicalPlanRef::Stream(plan, job_type)) => {
303 let graph = build_graph(plan.clone(), *job_type)?;
304 let table = table.map(|x| x.to_prost());
305 if explain_format == ExplainFormat::Dot {
306 blocks.push(explain_stream_graph_as_dot(
307 &graph,
308 table,
309 explain_verbose,
310 ))
311 } else {
312 blocks.push(explain_stream_graph(&graph, table, explain_verbose));
313 }
314 }
315 #[cfg(feature = "datafusion")]
316 PlanToExplain::Df(_) => {
317 return Err(ErrorCode::NotSupported(
318 "EXPLAIN DISTRIBUTED for DataFusion plan".into(),
319 "Distributed explain is only available for RisingWave batch plans."
320 .into(),
321 )
322 .into());
323 }
324 }
325 }
326 }
327 ExplainType::Physical => {
328 if !explain_trace && let Ok(physical_plan) = &plan {
330 match physical_plan {
331 PlanToExplain::Rw(physical_plan) => {
332 let plan = match &physical_plan {
333 PhysicalPlanRef::Stream(plan, _) => plan as &dyn Explain,
334 PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
335 };
336 match explain_format {
337 ExplainFormat::Text => {
338 blocks.push(plan.explain_to_string());
339 }
340 ExplainFormat::Json => blocks.push(plan.explain_to_json()),
341 ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
342 ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
343 ExplainFormat::Dot => {
344 if explain_backfill
345 && let PhysicalPlanRef::Stream(plan, _) = physical_plan
346 {
347 let dot_formatted_backfill_order =
348 explain_backfill_order_in_dot_format(
349 &session,
350 context.with_options().backfill_order_strategy(),
351 plan.clone(),
352 )?;
353 blocks.push(dot_formatted_backfill_order);
354 } else {
355 blocks.push(plan.explain_to_dot());
356 }
357 }
358 }
359 }
360 #[cfg(feature = "datafusion")]
361 PlanToExplain::Df(plan) => {
362 datafusion_physical_plan_request =
363 Some((plan.clone(), explain_format, explain_verbose));
364 }
365 }
366 }
367 }
368 ExplainType::Logical => {
369 if !explain_trace {
371 let output = context.take_logical().ok_or_else(|| {
372 ErrorCode::InternalError("Logical plan not found for query".into())
373 })?;
374 blocks.push(output);
375 }
376 }
377 }
378
379 plan?;
381 }
382
383 #[cfg(feature = "datafusion")]
385 if let Some((plan, explain_format, explain_verbose)) = datafusion_physical_plan_request {
386 let df_ctx = create_datafusion_context(session.as_ref())?;
387 let physical_plan = build_datafusion_physical_plan(&df_ctx, &plan).await?;
388 explain_datafusion_plan(
389 physical_plan.as_ref(),
390 explain_format,
391 explain_verbose,
392 blocks,
393 )?;
394 }
395
396 if let Some(fragmenter) = batch_plan_fragmenter {
397 let query = fragmenter.generate_complete_query().await?;
398 let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
399 let graph = query.stage_graph.to_petgraph();
400 let dot = Dot::new(&graph);
401 dot.to_string()
402 } else {
403 serde_json::to_string_pretty(&query.stage_graph).unwrap()
404 };
405 blocks.push(stage_graph);
406 }
407
408 Ok(())
409}
410
411#[cfg(feature = "datafusion")]
412fn explain_datafusion_plan(
413 plan: &dyn ExecutionPlan,
414 explain_format: ExplainFormat,
415 verbose: bool,
416 blocks: &mut Vec<String>,
417) -> Result<()> {
418 let output = match explain_format {
419 ExplainFormat::Text => displayable(plan).indent(verbose).to_string(),
420 unsupported => {
421 return Err(ErrorCode::NotSupported(
422 format!("EXPLAIN ... {:?} for DataFusion plan", unsupported),
423 "Only TEXT format is supported for DataFusion plans.".to_owned(),
424 )
425 .into());
426 }
427 };
428 blocks.push(output);
429 Ok(())
430}
431
432pub async fn handle_explain(
433 handler_args: HandlerArgs,
434 stmt: Statement,
435 options: ExplainOptions,
436 analyze: bool,
437) -> Result<RwPgResponse> {
438 if analyze {
439 bail_not_implemented!(issue = 4856, "explain analyze");
442 }
443 if options.trace && options.explain_format == ExplainFormat::Json {
444 return Err(ErrorCode::NotSupported(
445 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
446 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
447 )
448 .into());
449 }
450 if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
451 {
452 return Err(ErrorCode::NotSupported(
453 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
454 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
455 )
456 .into());
457 }
458
459 let mut blocks = Vec::new();
460 let result = Box::pin(do_handle_explain(handler_args, options, stmt, &mut blocks)).await;
461
462 if let Err(e) = result {
463 if options.trace {
464 blocks.push(if options.verbose {
466 format!("ERROR: {:?}", e.as_report())
467 } else {
468 format!("ERROR: {}", e.as_report())
469 });
470 } else {
471 return Err(e);
473 }
474 }
475
476 let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
477 query_plan: l.into(),
478 });
479
480 Ok(PgResponse::builder(StatementType::EXPLAIN)
481 .rows(rows)
482 .into())
483}
484
485#[derive(Fields)]
486#[fields(style = "TITLE CASE")]
487pub(crate) struct ExplainRow {
488 pub query_plan: String,
489}