risingwave_frontend/handler/
explain.rs1use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21 ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{Convention, Explain};
39use crate::scheduler::BatchPlanFragmenter;
40use crate::stream_fragmenter::build_graph;
41use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
42
43pub async fn do_handle_explain(
44 handler_args: HandlerArgs,
45 explain_options: ExplainOptions,
46 stmt: Statement,
47 blocks: &mut Vec<String>,
48) -> Result<()> {
49 let mut batch_plan_fragmenter = None;
51 let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
52
53 let session = handler_args.session.clone();
54
55 {
56 let (plan, context) = match stmt {
57 Statement::CreateTable {
61 name,
62 columns,
63 constraints,
64 format_encode,
65 source_watermarks,
66 append_only,
67 on_conflict,
68 with_version_column,
69 cdc_table_info,
70 include_column_options,
71 wildcard_idx,
72 webhook_info,
73 ..
74 } => {
75 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
76
77 let (plan, _source, _table, _job_type) = handle_create_table_plan(
78 handler_args,
79 explain_options,
80 format_encode,
81 cdc_table_info,
82 name.clone(),
83 columns,
84 wildcard_idx,
85 constraints,
86 source_watermarks,
87 append_only,
88 on_conflict,
89 with_version_column.map(|x| x.real_value()),
90 include_column_options,
91 webhook_info,
92 risingwave_common::catalog::Engine::Hummock,
93 )
94 .await?;
95 let context = plan.ctx();
96 (Ok(plan), context)
97 }
98 Statement::CreateSink { stmt } => {
99 let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
100 .await
101 .map(|plan| plan.sink_plan)?;
102 let context = plan.ctx();
103 (Ok(plan), context)
104 }
105
106 Statement::FetchCursor {
107 stmt: FetchCursorStatement { cursor_name, .. },
108 } => {
109 let cursor_manager = session.clone().get_cursor_manager();
110 let plan = cursor_manager
111 .gen_batch_plan_with_subscription_cursor(
112 &cursor_name.real_value(),
113 handler_args,
114 )
115 .await
116 .map(|x| x.plan)?;
117 let context = plan.ctx();
118 (Ok(plan), context)
119 }
120
121 _ => {
125 let context: OptimizerContextRef =
126 OptimizerContext::new(handler_args, explain_options).into();
127 let plan = match stmt {
128 Statement::CreateView {
130 or_replace: false,
131 materialized: true,
132 query,
133 name,
134 columns,
135 emit_mode,
136 ..
137 } => gen_create_mv_plan(
138 &session,
139 context.clone(),
140 *query,
141 name,
142 columns,
143 emit_mode,
144 )
145 .map(|x| x.0),
146 Statement::CreateView {
147 materialized: false,
148 ..
149 } => {
150 return Err(ErrorCode::NotSupported(
151 "EXPLAIN CREATE VIEW".into(),
152 "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
153 ).into());
154 }
155
156 Statement::CreateSubscription { .. } => {
157 return Err(ErrorCode::NotSupported(
158 "EXPLAIN CREATE SUBSCRIPTION".into(),
159 "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
160 ).into());
161 }
162 Statement::CreateIndex {
163 name,
164 table_name,
165 columns,
166 include,
167 distributed_by,
168 ..
169 } => {
170 let (schema_name, table, index_table_name) =
171 resolve_index_schema(&session, name, table_name)?;
172 gen_create_index_plan(
173 &session,
174 context.clone(),
175 schema_name,
176 table,
177 index_table_name,
178 columns,
179 include,
180 distributed_by,
181 )
182 }
183 .map(|x| x.0),
184
185 Statement::Insert { .. }
187 | Statement::Delete { .. }
188 | Statement::Update { .. }
189 | Statement::Query { .. } => {
190 gen_batch_plan_by_statement(&session, context, stmt).map(|x| x.plan)
191 }
192
193 _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
194 };
195
196 let plan = plan?;
197 let context = plan.ctx().clone();
198
199 (Ok(plan) as Result<_>, context)
200 }
201 };
202
203 let explain_trace = context.is_explain_trace();
204 let explain_verbose = context.is_explain_verbose();
205 let explain_backfill = context.is_explain_backfill();
206 let explain_type = context.explain_type();
207 let explain_format = context.explain_format();
208
209 if explain_trace {
210 let trace = context.take_trace();
211 blocks.extend(trace);
212 }
213
214 match explain_type {
215 ExplainType::DistSql => {
216 if let Ok(plan) = &plan {
217 match plan.convention() {
218 Convention::Logical => unreachable!(),
219 Convention::Batch => {
220 let worker_node_manager_reader = WorkerNodeSelector::new(
221 session.env().worker_node_manager_ref(),
222 session.is_barrier_read(),
223 );
224 batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
225 worker_node_manager_reader,
226 session.env().catalog_reader().clone(),
227 session.config().batch_parallelism().0,
228 session.config().timezone().to_owned(),
229 plan.clone(),
230 )?);
231 batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
232 ExplainFormat::Dot
233 } else {
234 ExplainFormat::Json
235 }
236 }
237 Convention::Stream => {
238 let graph = build_graph(plan.clone(), None)?;
239 if explain_format == ExplainFormat::Dot {
240 blocks.push(explain_stream_graph_as_dot(&graph, explain_verbose))
241 } else {
242 blocks.push(explain_stream_graph(&graph, explain_verbose));
243 }
244 }
245 }
246 }
247 }
248 ExplainType::Physical => {
249 if !explain_trace && let Ok(plan) = &plan {
251 match explain_format {
252 ExplainFormat::Text => {
253 blocks.push(plan.explain_to_string());
254 }
255 ExplainFormat::Json => blocks.push(plan.explain_to_json()),
256 ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
257 ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
258 ExplainFormat::Dot => {
259 if explain_backfill {
260 let dot_formatted_backfill_order =
261 explain_backfill_order_in_dot_format(
262 &session,
263 context.with_options().backfill_order_strategy(),
264 plan.clone(),
265 )?;
266 blocks.push(dot_formatted_backfill_order);
267 } else {
268 blocks.push(plan.explain_to_dot());
269 }
270 }
271 }
272 }
273 }
274 ExplainType::Logical => {
275 if !explain_trace {
277 let output = context.take_logical().ok_or_else(|| {
278 ErrorCode::InternalError("Logical plan not found for query".into())
279 })?;
280 blocks.push(output);
281 }
282 }
283 }
284
285 plan?;
287 }
288
289 if let Some(fragmenter) = batch_plan_fragmenter {
290 let query = fragmenter.generate_complete_query().await?;
291 let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
292 let graph = query.stage_graph.to_petgraph();
293 let dot = Dot::new(&graph);
294 dot.to_string()
295 } else {
296 serde_json::to_string_pretty(&query.stage_graph).unwrap()
297 };
298 blocks.push(stage_graph);
299 }
300
301 Ok(())
302}
303
304pub async fn handle_explain(
305 handler_args: HandlerArgs,
306 stmt: Statement,
307 options: ExplainOptions,
308 analyze: bool,
309) -> Result<RwPgResponse> {
310 if analyze {
311 bail_not_implemented!(issue = 4856, "explain analyze");
314 }
315 if options.trace && options.explain_format == ExplainFormat::Json {
316 return Err(ErrorCode::NotSupported(
317 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
318 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
319 )
320 .into());
321 }
322 if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
323 {
324 return Err(ErrorCode::NotSupported(
325 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
326 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
327 )
328 .into());
329 }
330
331 let mut blocks = Vec::new();
332 let result = do_handle_explain(handler_args, options.clone(), stmt, &mut blocks).await;
333
334 if let Err(e) = result {
335 if options.trace {
336 blocks.push(if options.verbose {
338 format!("ERROR: {:?}", e.as_report())
339 } else {
340 format!("ERROR: {}", e.as_report())
341 });
342 } else {
343 return Err(e);
345 }
346 }
347
348 let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
349 query_plan: l.into(),
350 });
351
352 Ok(PgResponse::builder(StatementType::EXPLAIN)
353 .rows(rows)
354 .into())
355}
356
357#[derive(Fields)]
358#[fields(style = "TITLE CASE")]
359pub(crate) struct ExplainRow {
360 pub query_plan: String,
361}