1use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21 ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::{BatchPlanChoice, gen_batch_plan_by_statement};
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32#[cfg(feature = "datafusion")]
33use crate::datafusion::DfBatchQueryPlanResult;
34use crate::error::{ErrorCode, Result};
35use crate::handler::HandlerArgs;
36use crate::handler::create_table::handle_create_table_plan;
37use crate::optimizer::OptimizerContext;
38use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
39use crate::optimizer::plan_node::generic::GenericPlanRef;
40use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
41use crate::scheduler::BatchPlanFragmenter;
42use crate::stream_fragmenter::build_graph;
43use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
44
45pub async fn do_handle_explain(
46 handler_args: HandlerArgs,
47 explain_options: ExplainOptions,
48 stmt: Statement,
49 blocks: &mut Vec<String>,
50) -> Result<()> {
51 let mut batch_plan_fragmenter = None;
53 let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
54
55 let session = handler_args.session.clone();
56
57 enum PhysicalPlanRef {
58 Stream(StreamPlanRef),
59 Batch(BatchPlanRef),
60 }
61 enum PlanToExplain {
62 Rw(PhysicalPlanRef),
63 #[cfg(feature = "datafusion")]
64 Df(DfBatchQueryPlanResult),
65 }
66
67 {
68 let (plan, table, context) = match stmt {
69 Statement::CreateTable {
73 name,
74 columns,
75 constraints,
76 format_encode,
77 source_watermarks,
78 append_only,
79 on_conflict,
80 with_version_columns,
81 cdc_table_info,
82 include_column_options,
83 wildcard_idx,
84 webhook_info,
85 ..
86 } => {
87 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
88
89 let (plan, _source, table, _job_type, _) = handle_create_table_plan(
90 handler_args,
91 explain_options,
92 format_encode,
93 cdc_table_info,
94 &name,
95 columns,
96 wildcard_idx,
97 constraints,
98 source_watermarks,
99 append_only,
100 on_conflict,
101 with_version_columns
102 .iter()
103 .map(|col| col.real_value())
104 .collect(),
105 include_column_options,
106 webhook_info,
107 risingwave_common::catalog::Engine::Hummock,
108 )
109 .await?;
110 let context = plan.ctx();
111 (
112 Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(plan))),
113 Some(table),
114 context,
115 )
116 }
117 Statement::CreateSink { stmt } => {
118 let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
119 .await
120 .map(|plan| plan.sink_plan)?;
121 let context = plan.ctx();
122 (
123 Ok(PlanToExplain::Rw(PhysicalPlanRef::Stream(plan))),
124 None,
125 context,
126 )
127 }
128
129 Statement::FetchCursor {
130 stmt: FetchCursorStatement { cursor_name, .. },
131 } => {
132 let cursor_manager = session.clone().get_cursor_manager();
133 let plan = cursor_manager
134 .gen_batch_plan_with_subscription_cursor(
135 &cursor_name.real_value(),
136 handler_args,
137 )
138 .await
139 .map(|x| x.plan)?;
140 let context = plan.ctx();
141 (
142 Ok(PlanToExplain::Rw(PhysicalPlanRef::Batch(plan))),
143 None,
144 context,
145 )
146 }
147
148 _ => {
152 let context: OptimizerContextRef =
153 OptimizerContext::new(handler_args, explain_options).into();
154 let context_clone = context.clone();
155
156 let res = match stmt {
157 Statement::CreateView {
159 or_replace: false,
160 materialized: true,
161 query,
162 name,
163 columns,
164 emit_mode,
165 ..
166 } => gen_create_mv_plan(&session, context, *query, name, columns, emit_mode)
167 .map(|(plan, table)| {
168 (
169 PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)),
170 Some(table),
171 )
172 }),
173 Statement::CreateView {
174 materialized: false,
175 ..
176 } => {
177 return Err(ErrorCode::NotSupported(
178 "EXPLAIN CREATE VIEW".into(),
179 "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
180 ).into());
181 }
182
183 Statement::CreateSubscription { .. } => {
184 return Err(ErrorCode::NotSupported(
185 "EXPLAIN CREATE SUBSCRIPTION".into(),
186 "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
187 ).into());
188 }
189 Statement::CreateIndex {
190 name,
191 table_name,
192 method,
193 columns,
194 include,
195 distributed_by,
196 ..
197 } => {
198 let (schema_name, table, index_table_name) =
199 resolve_index_schema(&session, name, table_name)?;
200 gen_create_index_plan(
201 &session,
202 context,
203 schema_name,
204 table,
205 index_table_name,
206 method,
207 columns,
208 include,
209 distributed_by,
210 )
211 }
212 .map(|(plan, index_table, _index)| {
213 (
214 PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)),
215 Some(index_table),
216 )
217 }),
218
219 Statement::Insert { .. }
221 | Statement::Delete { .. }
222 | Statement::Update { .. }
223 | Statement::Query { .. } => {
224 match gen_batch_plan_by_statement(&session, context, stmt)? {
225 BatchPlanChoice::Rw(plan_result) => Ok((
226 PlanToExplain::Rw(PhysicalPlanRef::Batch(plan_result.plan)),
227 None,
228 )),
229 #[cfg(feature = "datafusion")]
230 BatchPlanChoice::Df(plan_result) => {
231 Ok((PlanToExplain::Df(plan_result), None))
232 }
233 }
234 }
235
236 _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
237 };
238
239 match res {
240 Ok((plan, table)) => (Ok(plan), table, context_clone),
241 Err(e) => (Err(e), None, context_clone),
242 }
243 }
244 };
245
246 let explain_trace = explain_options.trace;
247 let explain_verbose = explain_options.verbose;
248 let explain_backfill = explain_options.backfill;
249 let explain_type = explain_options.explain_type;
250 let explain_format = explain_options.explain_format;
251
252 if explain_trace {
253 let trace = context.take_trace();
254 blocks.extend(trace);
255 }
256
257 match explain_type {
258 ExplainType::DistSql => {
259 if let Ok(plan) = &plan {
260 match plan {
261 PlanToExplain::Rw(PhysicalPlanRef::Batch(plan)) => {
262 let worker_node_manager_reader = WorkerNodeSelector::new(
263 session.env().worker_node_manager_ref(),
264 session.is_barrier_read(),
265 );
266 batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
267 worker_node_manager_reader,
268 session.env().catalog_reader().clone(),
269 session.config().batch_parallelism().0,
270 plan.clone(),
271 )?);
272 batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
273 ExplainFormat::Dot
274 } else {
275 ExplainFormat::Json
276 }
277 }
278 PlanToExplain::Rw(PhysicalPlanRef::Stream(plan)) => {
279 let graph = build_graph(plan.clone(), None)?;
280 let table = table.map(|x| x.to_prost());
281 if explain_format == ExplainFormat::Dot {
282 blocks.push(explain_stream_graph_as_dot(
283 &graph,
284 table,
285 explain_verbose,
286 ))
287 } else {
288 blocks.push(explain_stream_graph(&graph, table, explain_verbose));
289 }
290 }
291 #[cfg(feature = "datafusion")]
292 PlanToExplain::Df(_) => {
293 return Err(ErrorCode::NotSupported(
294 "EXPLAIN DISTRIBUTED for DataFusion plan".into(),
295 "Distributed explain is only available for RisingWave batch plans."
296 .into(),
297 )
298 .into());
299 }
300 }
301 }
302 }
303 ExplainType::Physical => {
304 if !explain_trace && let Ok(physical_plan) = &plan {
306 match physical_plan {
307 PlanToExplain::Rw(physical_plan) => {
308 let plan = match &physical_plan {
309 PhysicalPlanRef::Stream(plan) => plan as &dyn Explain,
310 PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
311 };
312 match explain_format {
313 ExplainFormat::Text => {
314 blocks.push(plan.explain_to_string());
315 }
316 ExplainFormat::Json => blocks.push(plan.explain_to_json()),
317 ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
318 ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
319 ExplainFormat::Dot => {
320 if explain_backfill
321 && let PhysicalPlanRef::Stream(plan) = physical_plan
322 {
323 let dot_formatted_backfill_order =
324 explain_backfill_order_in_dot_format(
325 &session,
326 context.with_options().backfill_order_strategy(),
327 plan.clone(),
328 )?;
329 blocks.push(dot_formatted_backfill_order);
330 } else {
331 blocks.push(plan.explain_to_dot());
332 }
333 }
334 }
335 }
336 #[cfg(feature = "datafusion")]
337 PlanToExplain::Df(plan) => {
338 explain_datafusion_plan(plan, explain_format, explain_verbose, blocks)?
339 }
340 }
341 }
342 }
343 ExplainType::Logical => {
344 if !explain_trace {
346 let output = context.take_logical().ok_or_else(|| {
347 ErrorCode::InternalError("Logical plan not found for query".into())
348 })?;
349 blocks.push(output);
350 }
351 }
352 }
353
354 plan?;
356 }
357
358 if let Some(fragmenter) = batch_plan_fragmenter {
359 let query = fragmenter.generate_complete_query().await?;
360 let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
361 let graph = query.stage_graph.to_petgraph();
362 let dot = Dot::new(&graph);
363 dot.to_string()
364 } else {
365 serde_json::to_string_pretty(&query.stage_graph).unwrap()
366 };
367 blocks.push(stage_graph);
368 }
369
370 Ok(())
371}
372
373#[cfg(feature = "datafusion")]
374fn explain_datafusion_plan(
375 plan: &DfBatchQueryPlanResult,
376 explain_format: ExplainFormat,
377 verbose: bool,
378 blocks: &mut Vec<String>,
379) -> Result<()> {
380 let output = match explain_format {
381 ExplainFormat::Text => {
382 if verbose {
383 plan.plan.display_indent_schema().to_string()
384 } else {
385 plan.plan.display_indent().to_string()
386 }
387 }
388 unsupported => {
389 return Err(ErrorCode::NotSupported(
390 format!("EXPLAIN ... {:?} for DataFusion plan", unsupported),
391 "Only TEXT format is supported for DataFusion plans.".to_owned(),
392 )
393 .into());
394 }
395 };
396 blocks.push(output);
397 Ok(())
398}
399
400pub async fn handle_explain(
401 handler_args: HandlerArgs,
402 stmt: Statement,
403 options: ExplainOptions,
404 analyze: bool,
405) -> Result<RwPgResponse> {
406 if analyze {
407 bail_not_implemented!(issue = 4856, "explain analyze");
410 }
411 if options.trace && options.explain_format == ExplainFormat::Json {
412 return Err(ErrorCode::NotSupported(
413 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
414 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
415 )
416 .into());
417 }
418 if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
419 {
420 return Err(ErrorCode::NotSupported(
421 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
422 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
423 )
424 .into());
425 }
426
427 let mut blocks = Vec::new();
428 let result = do_handle_explain(handler_args, options, stmt, &mut blocks).await;
429
430 if let Err(e) = result {
431 if options.trace {
432 blocks.push(if options.verbose {
434 format!("ERROR: {:?}", e.as_report())
435 } else {
436 format!("ERROR: {}", e.as_report())
437 });
438 } else {
439 return Err(e);
441 }
442 }
443
444 let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
445 query_plan: l.into(),
446 });
447
448 Ok(PgResponse::builder(StatementType::EXPLAIN)
449 .rows(rows)
450 .into())
451}
452
453#[derive(Fields)]
454#[fields(style = "TITLE CASE")]
455pub(crate) struct ExplainRow {
456 pub query_plan: String,
457}