risingwave_frontend/handler/
explain.rs1use petgraph::dot::Dot;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
18use risingwave_common::bail_not_implemented;
19use risingwave_common::types::Fields;
20use risingwave_sqlparser::ast::{
21 ExplainFormat, ExplainOptions, ExplainType, FetchCursorStatement, Statement,
22};
23use thiserror_ext::AsReport;
24
25use super::create_index::{gen_create_index_plan, resolve_index_schema};
26use super::create_mv::gen_create_mv_plan;
27use super::create_sink::gen_sink_plan;
28use super::query::gen_batch_plan_by_statement;
29use super::util::SourceSchemaCompatExt;
30use super::{RwPgResponse, RwPgResponseBuilderExt};
31use crate::OptimizerContextRef;
32use crate::error::{ErrorCode, Result};
33use crate::handler::HandlerArgs;
34use crate::handler::create_table::handle_create_table_plan;
35use crate::optimizer::OptimizerContext;
36use crate::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{BatchPlanRef, Explain, StreamPlanRef};
39use crate::scheduler::BatchPlanFragmenter;
40use crate::stream_fragmenter::build_graph;
41use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot};
42
43pub async fn do_handle_explain(
44 handler_args: HandlerArgs,
45 explain_options: ExplainOptions,
46 stmt: Statement,
47 blocks: &mut Vec<String>,
48) -> Result<()> {
49 let mut batch_plan_fragmenter = None;
51 let mut batch_plan_fragmenter_fmt = ExplainFormat::Json;
52
53 let session = handler_args.session.clone();
54
55 enum PhysicalPlanRef {
56 Stream(StreamPlanRef),
57 Batch(BatchPlanRef),
58 }
59
60 {
61 let (plan, table, context) = match stmt {
62 Statement::CreateTable {
66 name,
67 columns,
68 constraints,
69 format_encode,
70 source_watermarks,
71 append_only,
72 on_conflict,
73 with_version_columns,
74 cdc_table_info,
75 include_column_options,
76 wildcard_idx,
77 webhook_info,
78 ..
79 } => {
80 let format_encode = format_encode.map(|s| s.into_v2_with_warning());
81
82 let (plan, _source, table, _job_type, _) = handle_create_table_plan(
83 handler_args,
84 explain_options,
85 format_encode,
86 cdc_table_info,
87 &name,
88 columns,
89 wildcard_idx,
90 constraints,
91 source_watermarks,
92 append_only,
93 on_conflict,
94 with_version_columns
95 .iter()
96 .map(|col| col.real_value())
97 .collect(),
98 include_column_options,
99 webhook_info,
100 risingwave_common::catalog::Engine::Hummock,
101 )
102 .await?;
103 let context = plan.ctx();
104 (Ok(PhysicalPlanRef::Stream(plan)), Some(table), context)
105 }
106 Statement::CreateSink { stmt } => {
107 let plan = gen_sink_plan(handler_args, stmt, Some(explain_options), false)
108 .await
109 .map(|plan| plan.sink_plan)?;
110 let context = plan.ctx();
111 (Ok(PhysicalPlanRef::Stream(plan)), None, context)
112 }
113
114 Statement::FetchCursor {
115 stmt: FetchCursorStatement { cursor_name, .. },
116 } => {
117 let cursor_manager = session.clone().get_cursor_manager();
118 let plan = cursor_manager
119 .gen_batch_plan_with_subscription_cursor(
120 &cursor_name.real_value(),
121 handler_args,
122 )
123 .await
124 .map(|x| x.plan)?;
125 let context = plan.ctx();
126 (Ok(PhysicalPlanRef::Batch(plan)), None, context)
127 }
128
129 _ => {
133 let context: OptimizerContextRef =
134 OptimizerContext::new(handler_args, explain_options).into();
135 let context_clone = context.clone();
136
137 let res = match stmt {
138 Statement::CreateView {
140 or_replace: false,
141 materialized: true,
142 query,
143 name,
144 columns,
145 emit_mode,
146 ..
147 } => gen_create_mv_plan(&session, context, *query, name, columns, emit_mode)
148 .map(|(plan, table)| (PhysicalPlanRef::Stream(plan), Some(table))),
149 Statement::CreateView {
150 materialized: false,
151 ..
152 } => {
153 return Err(ErrorCode::NotSupported(
154 "EXPLAIN CREATE VIEW".into(),
155 "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into()
156 ).into());
157 }
158
159 Statement::CreateSubscription { .. } => {
160 return Err(ErrorCode::NotSupported(
161 "EXPLAIN CREATE SUBSCRIPTION".into(),
162 "A created SUBSCRIPTION only incremental data queries on the table, not supported EXPLAIN".into()
163 ).into());
164 }
165 Statement::CreateIndex {
166 name,
167 table_name,
168 method,
169 columns,
170 include,
171 distributed_by,
172 ..
173 } => {
174 let (schema_name, table, index_table_name) =
175 resolve_index_schema(&session, name, table_name)?;
176 gen_create_index_plan(
177 &session,
178 context,
179 schema_name,
180 table,
181 index_table_name,
182 method,
183 columns,
184 include,
185 distributed_by,
186 )
187 }
188 .map(|(plan, index_table, _index)| {
189 (PhysicalPlanRef::Stream(plan), Some(index_table))
190 }),
191
192 Statement::Insert { .. }
194 | Statement::Delete { .. }
195 | Statement::Update { .. }
196 | Statement::Query { .. } => {
197 gen_batch_plan_by_statement(&session, context, stmt)
198 .map(|x| (PhysicalPlanRef::Batch(x.plan), None))
199 }
200
201 _ => bail_not_implemented!("unsupported statement for EXPLAIN: {stmt}"),
202 };
203
204 match res {
205 Ok((plan, table)) => (Ok(plan), table, context_clone),
206 Err(e) => (Err(e), None, context_clone),
207 }
208 }
209 };
210
211 let explain_trace = explain_options.trace;
212 let explain_verbose = explain_options.verbose;
213 let explain_backfill = explain_options.backfill;
214 let explain_type = explain_options.explain_type;
215 let explain_format = explain_options.explain_format;
216
217 if explain_trace {
218 let trace = context.take_trace();
219 blocks.extend(trace);
220 }
221
222 match explain_type {
223 ExplainType::DistSql => {
224 if let Ok(plan) = &plan {
225 match plan {
226 PhysicalPlanRef::Batch(plan) => {
227 let worker_node_manager_reader = WorkerNodeSelector::new(
228 session.env().worker_node_manager_ref(),
229 session.is_barrier_read(),
230 );
231 batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
232 worker_node_manager_reader,
233 session.env().catalog_reader().clone(),
234 session.config().batch_parallelism().0,
235 session.config().timezone(),
236 plan.clone(),
237 )?);
238 batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot {
239 ExplainFormat::Dot
240 } else {
241 ExplainFormat::Json
242 }
243 }
244 PhysicalPlanRef::Stream(plan) => {
245 let graph = build_graph(plan.clone(), None)?;
246 let table = table.map(|x| x.to_prost());
247 if explain_format == ExplainFormat::Dot {
248 blocks.push(explain_stream_graph_as_dot(
249 &graph,
250 table,
251 explain_verbose,
252 ))
253 } else {
254 blocks.push(explain_stream_graph(&graph, table, explain_verbose));
255 }
256 }
257 }
258 }
259 }
260 ExplainType::Physical => {
261 if !explain_trace && let Ok(physical_plan) = &plan {
263 let plan = match &physical_plan {
264 PhysicalPlanRef::Stream(plan) => plan as &dyn Explain,
265 PhysicalPlanRef::Batch(plan) => plan as &dyn Explain,
266 };
267 match explain_format {
268 ExplainFormat::Text => {
269 blocks.push(plan.explain_to_string());
270 }
271 ExplainFormat::Json => blocks.push(plan.explain_to_json()),
272 ExplainFormat::Xml => blocks.push(plan.explain_to_xml()),
273 ExplainFormat::Yaml => blocks.push(plan.explain_to_yaml()),
274 ExplainFormat::Dot => {
275 if explain_backfill && let PhysicalPlanRef::Stream(plan) = physical_plan
276 {
277 let dot_formatted_backfill_order =
278 explain_backfill_order_in_dot_format(
279 &session,
280 context.with_options().backfill_order_strategy(),
281 plan.clone(),
282 )?;
283 blocks.push(dot_formatted_backfill_order);
284 } else {
285 blocks.push(plan.explain_to_dot());
286 }
287 }
288 }
289 }
290 }
291 ExplainType::Logical => {
292 if !explain_trace {
294 let output = context.take_logical().ok_or_else(|| {
295 ErrorCode::InternalError("Logical plan not found for query".into())
296 })?;
297 blocks.push(output);
298 }
299 }
300 }
301
302 plan?;
304 }
305
306 if let Some(fragmenter) = batch_plan_fragmenter {
307 let query = fragmenter.generate_complete_query().await?;
308 let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot {
309 let graph = query.stage_graph.to_petgraph();
310 let dot = Dot::new(&graph);
311 dot.to_string()
312 } else {
313 serde_json::to_string_pretty(&query.stage_graph).unwrap()
314 };
315 blocks.push(stage_graph);
316 }
317
318 Ok(())
319}
320
321pub async fn handle_explain(
322 handler_args: HandlerArgs,
323 stmt: Statement,
324 options: ExplainOptions,
325 analyze: bool,
326) -> Result<RwPgResponse> {
327 if analyze {
328 bail_not_implemented!(issue = 4856, "explain analyze");
331 }
332 if options.trace && options.explain_format == ExplainFormat::Json {
333 return Err(ErrorCode::NotSupported(
334 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
335 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
336 )
337 .into());
338 }
339 if options.explain_type == ExplainType::DistSql && options.explain_format == ExplainFormat::Json
340 {
341 return Err(ErrorCode::NotSupported(
342 "EXPLAIN (TRACE, JSON FORMAT)".to_owned(),
343 "Only EXPLAIN (LOGICAL | PHYSICAL, JSON FORMAT) is supported.".to_owned(),
344 )
345 .into());
346 }
347
348 let mut blocks = Vec::new();
349 let result = do_handle_explain(handler_args, options, stmt, &mut blocks).await;
350
351 if let Err(e) = result {
352 if options.trace {
353 blocks.push(if options.verbose {
355 format!("ERROR: {:?}", e.as_report())
356 } else {
357 format!("ERROR: {}", e.as_report())
358 });
359 } else {
360 return Err(e);
362 }
363 }
364
365 let rows = blocks.iter().flat_map(|b| b.lines()).map(|l| ExplainRow {
366 query_plan: l.into(),
367 });
368
369 Ok(PgResponse::builder(StatementType::EXPLAIN)
370 .rows(rows)
371 .into())
372}
373
374#[derive(Fields)]
375#[fields(style = "TITLE CASE")]
376pub(crate) struct ExplainRow {
377 pub query_plan: String,
378}