risingwave_frontend/handler/
explain.rs1use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21 ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
39use crate::scheduler::BatchPlanFragmenter;
40use crate::stream_fragmenter::build_graph;
41use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
42
43pub async fn do_handle_explain(
44 handler_args: HandlerArgs,
45 explain_options: ExplainOptions,
46 stmt: Statement,
47 blocks: &mut Vec<String>,
48) -> Result<()> {
49 let mut batch_plan_fragmenter = None;
51 let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
52
53 let session = handler_args.session.clone();
54
55 enum PhysicalPlanRef {
56 Stream(StreamPlanRef),
57 Batch(BatchPlanRef),
58 }
59
60 {
61 let (plan, table, context) = match stmt {
62 Statement::CreateTable {
66 name,
67 columns,
68 constraints,
69 format_encode,
70 source_watermarks,
71 append_only,
72 on_conflict,
73 with_version_columns,
74 cdc_table_info,
75 include_column_options,
76 wildcard_idx,
77 webhook_info,
78 ..
79 } => {
80 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
81
82 let (plan, _source, table, _job_type, _) = handle_create_table_plan(
83 handler_args,
84 explain_options,
85 format_encode,
86 cdc_table_info,
87 &name,
88 columns,
89 wildcard_idx,
90 constraints,
91 source_watermarks,
92 append_only,
93 on_conflict,
94 with_version_columns
95 .iter()
96 .map(|col| col.real_value())
97 .collect(),
98 include_column_options,
99 webhook_info,
100 risingwave_common::catalog::Engine::Hummock,
101 )
102 .await?;
103 let context = plan.ctx();
104 (Ok(PhysicalPlanRef::Stream(plan)), Some(table), context)
105 }
106 Statement::CreateSink { stmt } => {
107 let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
108 .await
109 .map(|plan| plan.sink_plan)?;
110 let context = plan.ctx();
111 (Ok(PhysicalPlanRef::Stream(plan)), None, context)
112 }
113
114 Statement::FetchCursor {
115 stmt: FetchCursorStatement { cursor_name, .. },
116 } => {
117 let cursor_manager = session.clone().get_cursor_manager();
118 let plan = cursor_manager
119 .gen_batch_plan_with_subscription_cursor(
120 &cursor_name.real_value(),
121 handler_args,
122 )
123 .await
124 .map(|x| x.plan)?;
125 let context = plan.ctx();
126 (Ok(PhysicalPlanRef::Batch(plan)), None, context)
127 }
128
129 _ => {
133 let context: OptimizerContextRef =
134 OptimizerContext::new(handler_args, explain_options).into();
135 let (plan, table) = match stmt {
136 Statement::CreateView {
138 or_replace: false,
139 materialized: true,
140 query,
141 name,
142 columns,
143 emit_mode,
144 ..
145 } => gen_create_mv_plan(
146 &session,
147 context.clone(),
148 *query,
149 name,
150 columns,
151 emit_mode,
152 )
153 .map(|(plan, table)| (PhysicalPlanRef::Stream(plan), Some(table))),
154 Statement::CreateView {
155 materialized: false,
156 ..
157 } => {
158 return Err(ErrorCode::NotSupported(
159 "EXPLAIN CREATE VIEW".into(),
160 "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
161 ).into());
162 }
163
164 Statement::CreateSubscription { .. } => {
165 return Err(ErrorCode::NotSupported(
166 "EXPLAIN CREATE SUBSCRIPTION".into(),
167 "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
168 ).into());
169 }
170 Statement::CreateIndex {
171 name,
172 table_name,
173 method,
174 columns,
175 include,
176 distributed_by,
177 ..
178 } => {
179 let (schema_name, table, index_table_name) =
180 resolve_index_schema(&session, name, table_name)?;
181 gen_create_index_plan(
182 &session,
183 context.clone(),
184 schema_name,
185 table,
186 index_table_name,
187 method,
188 columns,
189 include,
190 distributed_by,
191 )
192 }
193 .map(|(plan, index_table, _index)| {
194 (PhysicalPlanRef::Stream(plan), Some(index_table))
195 }),
196
197 Statement::Insert { .. }
199 | Statement::Delete { .. }
200 | Statement::Update { .. }
201 | Statement::Query { .. } => {
202 gen_batch_plan_by_statement(&session, context, stmt)
203 .map(|x| (PhysicalPlanRef::Batch(x.plan), None))
204 }
205
206 _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
207 }?;
208
209 let context = match &plan {
210 PhysicalPlanRef::Stream(plan) => plan.ctx(),
211 PhysicalPlanRef::Batch(plan) => plan.ctx(),
212 };
213
214 (Ok(plan) as Result<_>, table, context)
215 }
216 };
217
218 let explain_trace = context.is_explain_trace();
219 let explain_verbose = context.is_explain_verbose();
220 let explain_backfill = context.is_explain_backfill();
221 let explain_type = context.explain_type();
222 let explain_format = context.explain_format();
223
224 if explain_trace {
225 let trace = context.take_trace();
226 blocks.extend(trace);
227 }
228
229 match explain_type {
230 ExplainType::DistSql => {
231 if let Ok(plan) = &plan {
232 match plan {
233 PhysicalPlanRef::Batch(plan) => {
234 let worker_node_manager_reader = WorkerNodeSelector::new(
235 session.env().worker_node_manager_ref(),
236 session.is_barrier_read(),
237 );
238 batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
239 worker_node_manager_reader,
240 session.env().catalog_reader().clone(),
241 session.config().batch_parallelism().0,
242 session.config().timezone().to_owned(),
243 plan.clone(),
244 )?);
245 batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
246 ExplainFormat::Dot
247 } else {
248 ExplainFormat::Json
249 }
250 }
251 PhysicalPlanRef::Stream(plan) => {
252 let graph = build_graph(plan.clone(), None)?;
253 let table = table.map(|x| x.to_prost());
254 if explain_format == ExplainFormat::Dot {
255 blocks.push(explain_stream_graph_as_dot(
256 &graph,
257 table,
258 explain_verbose,
259 ))
260 } else {
261 blocks.push(explain_stream_graph(&graph, table, explain_verbose));
262 }
263 }
264 }
265 }
266 }
267 ExplainType::Physical => {
268 if !explain_trace && let Ok(physical_plan) = &plan {
270 let plan = match &physical_plan {
271 PhysicalPlanRef::Stream(plan) => plan as &dyn Explain,
272 PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
273 };
274 match explain_format {
275 ExplainFormat::Text => {
276 blocks.push(plan.explain_to_string());
277 }
278 ExplainFormat::Json => blocks.push(plan.explain_to_json()),
279 ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
280 ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
281 ExplainFormat::Dot => {
282 if explain_backfill && let PhysicalPlanRef::Stream(plan) = physical_plan
283 {
284 let dot_formatted_backfill_order =
285 explain_backfill_order_in_dot_format(
286 &session,
287 context.with_options().backfill_order_strategy(),
288 plan.clone(),
289 )?;
290 blocks.push(dot_formatted_backfill_order);
291 } else {
292 blocks.push(plan.explain_to_dot());
293 }
294 }
295 }
296 }
297 }
298 ExplainType::Logical => {
299 if !explain_trace {
301 let output = context.take_logical().ok_or_else(|| {
302 ErrorCode::InternalError("Logical plan not found for query".into())
303 })?;
304 blocks.push(output);
305 }
306 }
307 }
308
309 plan?;
311 }
312
313 if let Some(fragmenter) = batch_plan_fragmenter {
314 let query = fragmenter.generate_complete_query().await?;
315 let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
316 let graph = query.stage_graph.to_petgraph();
317 let dot = Dot::new(&graph);
318 dot.to_string()
319 } else {
320 serde_json::to_string_pretty(&query.stage_graph).unwrap()
321 };
322 blocks.push(stage_graph);
323 }
324
325 Ok(())
326}
327
328pub async fn handle_explain(
329 handler_args: HandlerArgs,
330 stmt: Statement,
331 options: ExplainOptions,
332 analyze: bool,
333) -> Result<RwPgResponse> {
334 if analyze {
335 bail_not_implemented!(issue = 4856, "explain analyze");
338 }
339 if options.trace && options.explain_format == ExplainFormat::Json {
340 return Err(ErrorCode::NotSupported(
341 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
342 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
343 )
344 .into());
345 }
346 if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
347 {
348 return Err(ErrorCode::NotSupported(
349 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
350 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
351 )
352 .into());
353 }
354
355 let mut blocks = Vec::new();
356 let result = do_handle_explain(handler_args, options.clone(), stmt, &mut blocks).await;
357
358 if let Err(e) = result {
359 if options.trace {
360 blocks.push(if options.verbose {
362 format!("ERROR: {:?}", e.as_report())
363 } else {
364 format!("ERROR: {}", e.as_report())
365 });
366 } else {
367 return Err(e);
369 }
370 }
371
372 let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
373 query_plan: l.into(),
374 });
375
376 Ok(PgResponse::builder(StatementType::EXPLAIN)
377 .rows(rows)
378 .into())
379}
380
381#[derive(Fields)]
382#[fields(style = "TITLE CASE")]
383pub(crate) struct ExplainRow {
384 pub query_plan: String,
385}