risingwave_frontend/handler/
explain.rs1use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21 ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
39use crate::scheduler::BatchPlanFragmenter;
40use crate::stream_fragmenter::build_graph;
41use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
42
43pub async fn do_handle_explain(
44 handler_args: HandlerArgs,
45 explain_options: ExplainOptions,
46 stmt: Statement,
47 blocks: &mut Vec<String>,
48) -> Result<()> {
49 let mut batch_plan_fragmenter = None;
51 let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
52
53 let session = handler_args.session.clone();
54
55 enum PhysicalPlanRef {
56 Stream(StreamPlanRef),
57 Batch(BatchPlanRef),
58 }
59
60 {
61 let (plan, table, context) = match stmt {
62 Statement::CreateTable {
66 name,
67 columns,
68 constraints,
69 format_encode,
70 source_watermarks,
71 append_only,
72 on_conflict,
73 with_version_column,
74 cdc_table_info,
75 include_column_options,
76 wildcard_idx,
77 webhook_info,
78 ..
79 } => {
80 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
81
82 let (plan, _source, table, _job_type, _) = handle_create_table_plan(
83 handler_args,
84 explain_options,
85 format_encode,
86 cdc_table_info,
87 &name,
88 columns,
89 wildcard_idx,
90 constraints,
91 source_watermarks,
92 append_only,
93 on_conflict,
94 with_version_column.map(|x| x.real_value()),
95 include_column_options,
96 webhook_info,
97 risingwave_common::catalog::Engine::Hummock,
98 )
99 .await?;
100 let context = plan.ctx();
101 (Ok(PhysicalPlanRef::Stream(plan)), Some(table), context)
102 }
103 Statement::CreateSink { stmt } => {
104 let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
105 .await
106 .map(|plan| plan.sink_plan)?;
107 let context = plan.ctx();
108 (Ok(PhysicalPlanRef::Stream(plan)), None, context)
109 }
110
111 Statement::FetchCursor {
112 stmt: FetchCursorStatement { cursor_name, .. },
113 } => {
114 let cursor_manager = session.clone().get_cursor_manager();
115 let plan = cursor_manager
116 .gen_batch_plan_with_subscription_cursor(
117 &cursor_name.real_value(),
118 handler_args,
119 )
120 .await
121 .map(|x| x.plan)?;
122 let context = plan.ctx();
123 (Ok(PhysicalPlanRef::Batch(plan)), None, context)
124 }
125
126 _ => {
130 let context: OptimizerContextRef =
131 OptimizerContext::new(handler_args, explain_options).into();
132 let (plan, table) = match stmt {
133 Statement::CreateView {
135 or_replace: false,
136 materialized: true,
137 query,
138 name,
139 columns,
140 emit_mode,
141 ..
142 } => gen_create_mv_plan(
143 &session,
144 context.clone(),
145 *query,
146 name,
147 columns,
148 emit_mode,
149 )
150 .map(|(plan, table)| (PhysicalPlanRef::Stream(plan), Some(table))),
151 Statement::CreateView {
152 materialized: false,
153 ..
154 } => {
155 return Err(ErrorCode::NotSupported(
156 "EXPLAIN CREATE VIEW".into(),
157 "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
158 ).into());
159 }
160
161 Statement::CreateSubscription { .. } => {
162 return Err(ErrorCode::NotSupported(
163 "EXPLAIN CREATE SUBSCRIPTION".into(),
164 "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
165 ).into());
166 }
167 Statement::CreateIndex {
168 name,
169 table_name,
170 columns,
171 include,
172 distributed_by,
173 ..
174 } => {
175 let (schema_name, table, index_table_name) =
176 resolve_index_schema(&session, name, table_name)?;
177 gen_create_index_plan(
178 &session,
179 context.clone(),
180 schema_name,
181 table,
182 index_table_name,
183 columns,
184 include,
185 distributed_by,
186 )
187 }
188 .map(|(plan, index_table, _index)| {
189 (PhysicalPlanRef::Stream(plan), Some(index_table))
190 }),
191
192 Statement::Insert { .. }
194 | Statement::Delete { .. }
195 | Statement::Update { .. }
196 | Statement::Query { .. } => {
197 gen_batch_plan_by_statement(&session, context, stmt)
198 .map(|x| (PhysicalPlanRef::Batch(x.plan), None))
199 }
200
201 _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
202 }?;
203
204 let context = match &plan {
205 PhysicalPlanRef::Stream(plan) => plan.ctx(),
206 PhysicalPlanRef::Batch(plan) => plan.ctx(),
207 };
208
209 (Ok(plan) as Result<_>, table, context)
210 }
211 };
212
213 let explain_trace = context.is_explain_trace();
214 let explain_verbose = context.is_explain_verbose();
215 let explain_backfill = context.is_explain_backfill();
216 let explain_type = context.explain_type();
217 let explain_format = context.explain_format();
218
219 if explain_trace {
220 let trace = context.take_trace();
221 blocks.extend(trace);
222 }
223
224 match explain_type {
225 ExplainType::DistSql => {
226 if let Ok(plan) = &plan {
227 match plan {
228 PhysicalPlanRef::Batch(plan) => {
229 let worker_node_manager_reader = WorkerNodeSelector::new(
230 session.env().worker_node_manager_ref(),
231 session.is_barrier_read(),
232 );
233 batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
234 worker_node_manager_reader,
235 session.env().catalog_reader().clone(),
236 session.config().batch_parallelism().0,
237 session.config().timezone().to_owned(),
238 plan.clone(),
239 )?);
240 batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
241 ExplainFormat::Dot
242 } else {
243 ExplainFormat::Json
244 }
245 }
246 PhysicalPlanRef::Stream(plan) => {
247 let graph = build_graph(plan.clone(), None)?;
248 let table = table.map(|x| x.to_prost());
249 if explain_format == ExplainFormat::Dot {
250 blocks.push(explain_stream_graph_as_dot(
251 &graph,
252 table,
253 explain_verbose,
254 ))
255 } else {
256 blocks.push(explain_stream_graph(&graph, table, explain_verbose));
257 }
258 }
259 }
260 }
261 }
262 ExplainType::Physical => {
263 if !explain_trace && let Ok(physical_plan) = &plan {
265 let plan = match &physical_plan {
266 PhysicalPlanRef::Stream(plan) => plan as &dyn Explain,
267 PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
268 };
269 match explain_format {
270 ExplainFormat::Text => {
271 blocks.push(plan.explain_to_string());
272 }
273 ExplainFormat::Json => blocks.push(plan.explain_to_json()),
274 ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
275 ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
276 ExplainFormat::Dot => {
277 if explain_backfill && let PhysicalPlanRef::Stream(plan) = physical_plan
278 {
279 let dot_formatted_backfill_order =
280 explain_backfill_order_in_dot_format(
281 &session,
282 context.with_options().backfill_order_strategy(),
283 plan.clone(),
284 )?;
285 blocks.push(dot_formatted_backfill_order);
286 } else {
287 blocks.push(plan.explain_to_dot());
288 }
289 }
290 }
291 }
292 }
293 ExplainType::Logical => {
294 if !explain_trace {
296 let output = context.take_logical().ok_or_else(|| {
297 ErrorCode::InternalError("Logical plan not found for query".into())
298 })?;
299 blocks.push(output);
300 }
301 }
302 }
303
304 plan?;
306 }
307
308 if let Some(fragmenter) = batch_plan_fragmenter {
309 let query = fragmenter.generate_complete_query().await?;
310 let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
311 let graph = query.stage_graph.to_petgraph();
312 let dot = Dot::new(&graph);
313 dot.to_string()
314 } else {
315 serde_json::to_string_pretty(&query.stage_graph).unwrap()
316 };
317 blocks.push(stage_graph);
318 }
319
320 Ok(())
321}
322
323pub async fn handle_explain(
324 handler_args: HandlerArgs,
325 stmt: Statement,
326 options: ExplainOptions,
327 analyze: bool,
328) -> Result<RwPgResponse> {
329 if analyze {
330 bail_not_implemented!(issue = 4856, "explain analyze");
333 }
334 if options.trace && options.explain_format == ExplainFormat::Json {
335 return Err(ErrorCode::NotSupported(
336 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
337 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
338 )
339 .into());
340 }
341 if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
342 {
343 return Err(ErrorCode::NotSupported(
344 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
345 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
346 )
347 .into());
348 }
349
350 let mut blocks = Vec::new();
351 let result = do_handle_explain(handler_args, options.clone(), stmt, &mut blocks).await;
352
353 if let Err(e) = result {
354 if options.trace {
355 blocks.push(if options.verbose {
357 format!("ERROR: {:?}", e.as_report())
358 } else {
359 format!("ERROR: {}", e.as_report())
360 });
361 } else {
362 return Err(e);
364 }
365 }
366
367 let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
368 query_plan: l.into(),
369 });
370
371 Ok(PgResponse::builder(StatementType::EXPLAIN)
372 .rows(rows)
373 .into())
374}
375
376#[derive(Fields)]
377#[fields(style = "TITLE CASE")]
378pub(crate) struct ExplainRow {
379 pub query_plan: String,
380}