1use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Instant;
18
19use itertools::Itertools;
20use pgwire::pg_field_descriptor::PgFieldDescriptor;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::types::Format;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{FunctionId, Schema};
26use risingwave_common::session_config::QueryMode;
27use risingwave_common::types::{DataType, Datum};
28use risingwave_sqlparser::ast::{SetExpr, Statement};
29
30use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
31use super::{PgResponseStream, RwPgResponse, create_mv, declare_cursor};
32use crate::PlanRef;
33use crate::binder::{Binder, BoundCreateView, BoundStatement};
34use crate::catalog::TableId;
35use crate::error::{ErrorCode, Result, RwError};
36use crate::handler::HandlerArgs;
37use crate::handler::flush::do_flush;
38use crate::handler::util::{DataChunkToRowSetAdapter, to_pg_field};
39use crate::optimizer::plan_node::Explain;
40use crate::optimizer::{
41 ExecutionModeDecider, OptimizerContext, OptimizerContextRef, ReadStorageTableVisitor,
42 RelationCollectorVisitor, SysTableVisitor,
43};
44use crate::planner::Planner;
45use crate::scheduler::plan_fragmenter::Query;
46use crate::scheduler::{
47 BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
48 LocalQueryExecution, LocalQueryStream,
49};
50use crate::session::SessionImpl;
51
52pub async fn handle_query(
53 handler_args: HandlerArgs,
54 stmt: Statement,
55 formats: Vec<Format>,
56) -> Result<RwPgResponse> {
57 let session = handler_args.session.clone();
58 let plan_fragmenter_result = {
59 let context = OptimizerContext::from_handler_args(handler_args);
60 let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
61 risingwave_expr::expr_context::TIME_ZONE::sync_scope(
63 session.config().timezone().to_owned(),
64 || gen_batch_plan_fragmenter(&session, plan_result),
65 )?
66 };
67 execute(session, plan_fragmenter_result, formats).await
68}
69
70pub fn handle_parse(
71 handler_args: HandlerArgs,
72 statement: Statement,
73 specific_param_types: Vec<Option<DataType>>,
74) -> Result<PrepareStatement> {
75 let session = handler_args.session;
76 let bound_result = gen_bound(&session, statement.clone(), specific_param_types)?;
77
78 Ok(PrepareStatement::Prepared(PreparedResult {
79 statement,
80 bound_result,
81 }))
82}
83
84pub async fn handle_execute(
86 handler_args: HandlerArgs,
87 portal: PortalResult,
88) -> Result<RwPgResponse> {
89 let PortalResult {
90 bound_result,
91 result_formats,
92 statement,
93 } = portal;
94 match statement {
95 Statement::Query(_)
96 | Statement::Insert { .. }
97 | Statement::Delete { .. }
98 | Statement::Update { .. } => {
99 let session = handler_args.session.clone();
101 let plan_fragmenter_result = {
102 let context = OptimizerContext::from_handler_args(handler_args);
103 let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
104 risingwave_expr::expr_context::TIME_ZONE::sync_scope(
106 session.config().timezone().to_owned(),
107 || gen_batch_plan_fragmenter(&session, plan_result),
108 )?
109 };
110 execute(session, plan_fragmenter_result, result_formats).await
111 }
112 Statement::CreateView { materialized, .. } if materialized => {
113 let BoundResult {
115 bound,
116 dependent_relations,
117 dependent_udfs,
118 ..
119 } = bound_result;
120 let create_mv = if let BoundStatement::CreateView(create_mv) = bound {
121 create_mv
122 } else {
123 unreachable!("expect a BoundStatement::CreateView")
124 };
125 let BoundCreateView {
126 or_replace,
127 materialized: _,
128 if_not_exists,
129 name,
130 columns,
131 query,
132 emit_mode,
133 with_options,
134 } = *create_mv;
135 if or_replace {
136 bail_not_implemented!("CREATE OR REPLACE VIEW");
137 }
138
139 let handler_args = HandlerArgs {
141 session: handler_args.session.clone(),
142 sql: handler_args.sql.clone(),
143 normalized_sql: handler_args.normalized_sql.clone(),
144 with_options: crate::WithOptions::try_from(with_options.as_slice())?,
145 };
146
147 create_mv::handle_create_mv_bound(
148 handler_args,
149 if_not_exists,
150 name,
151 *query,
152 dependent_relations,
153 dependent_udfs,
154 columns,
155 emit_mode,
156 )
157 .await
158 }
159 Statement::DeclareCursor { stmt } => match stmt.declare_cursor {
160 risingwave_sqlparser::ast::DeclareCursor::Query(_) => {
161 let session = handler_args.session.clone();
162 let plan_fragmenter_result = {
163 let context = OptimizerContext::from_handler_args(handler_args.clone());
164 let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
165 gen_batch_plan_fragmenter(&session, plan_result)?
166 };
167 declare_cursor::handle_bound_declare_query_cursor(
168 handler_args,
169 stmt.cursor_name,
170 plan_fragmenter_result,
171 )
172 .await
173 }
174 risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
175 declare_cursor::handle_declare_subscription_cursor(
176 handler_args,
177 sub_name,
178 stmt.cursor_name,
179 rw_timestamp,
180 )
181 .await
182 }
183 },
184 _ => unreachable!(),
185 }
186}
187
188pub fn gen_batch_plan_by_statement(
189 session: &SessionImpl,
190 context: OptimizerContextRef,
191 stmt: Statement,
192) -> Result<BatchQueryPlanResult> {
193 let bound_result = gen_bound(session, stmt, vec![])?;
194 gen_batch_query_plan(session, context, bound_result)
195}
196
197#[derive(Clone)]
198pub struct BoundResult {
199 pub(crate) stmt_type: StatementType,
200 pub(crate) must_dist: bool,
201 pub(crate) bound: BoundStatement,
202 pub(crate) param_types: Vec<DataType>,
203 pub(crate) parsed_params: Option<Vec<Datum>>,
204 pub(crate) dependent_relations: HashSet<TableId>,
205 pub(crate) dependent_udfs: HashSet<FunctionId>,
207}
208
209fn gen_bound(
210 session: &SessionImpl,
211 stmt: Statement,
212 specific_param_types: Vec<Option<DataType>>,
213) -> Result<BoundResult> {
214 let stmt_type = StatementType::infer_from_statement(&stmt)
215 .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
216 let must_dist = must_run_in_distributed_mode(&stmt)?;
217
218 let mut binder = Binder::new_with_param_types(session, specific_param_types);
219 let bound = binder.bind(stmt)?;
220
221 Ok(BoundResult {
222 stmt_type,
223 must_dist,
224 bound,
225 param_types: binder.export_param_types()?,
226 parsed_params: None,
227 dependent_relations: binder.included_relations().clone(),
228 dependent_udfs: binder.included_udfs().clone(),
229 })
230}
231
232pub struct BatchQueryPlanResult {
233 pub(crate) plan: PlanRef,
234 pub(crate) query_mode: QueryMode,
235 pub(crate) schema: Schema,
236 pub(crate) stmt_type: StatementType,
237 pub(crate) dependent_relations: Vec<TableId>,
241 pub(crate) read_storage_tables: HashSet<TableId>,
242}
243
244fn gen_batch_query_plan(
245 session: &SessionImpl,
246 context: OptimizerContextRef,
247 bind_result: BoundResult,
248) -> Result<BatchQueryPlanResult> {
249 let BoundResult {
250 stmt_type,
251 must_dist,
252 bound,
253 dependent_relations,
254 ..
255 } = bind_result;
256
257 let mut planner = if matches!(bound, BoundStatement::Query(_)) {
258 Planner::new_for_batch_dql(context)
259 } else {
260 Planner::new_for_batch(context)
261 };
262
263 let mut logical = planner.plan(bound)?;
264 let schema = logical.schema();
265 let batch_plan = logical.gen_batch_plan()?;
266
267 let dependent_relations =
268 RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.clone());
269
270 let read_storage_tables = ReadStorageTableVisitor::collect(batch_plan.clone());
271
272 let must_local = must_run_in_local_mode(batch_plan.clone());
273
274 let query_mode = match (must_dist, must_local) {
275 (true, true) => {
276 return Err(ErrorCode::InternalError(
277 "the query is forced to both local and distributed mode by optimizer".to_owned(),
278 )
279 .into());
280 }
281 (true, false) => QueryMode::Distributed,
282 (false, true) => QueryMode::Local,
283 (false, false) => match session.config().query_mode() {
284 QueryMode::Auto => determine_query_mode(batch_plan.clone()),
285 QueryMode::Local => QueryMode::Local,
286 QueryMode::Distributed => QueryMode::Distributed,
287 },
288 };
289
290 let physical = match query_mode {
291 QueryMode::Auto => unreachable!(),
292 QueryMode::Local => logical.gen_batch_local_plan()?,
293 QueryMode::Distributed => logical.gen_batch_distributed_plan()?,
294 };
295
296 Ok(BatchQueryPlanResult {
297 plan: physical,
298 query_mode,
299 schema,
300 stmt_type,
301 dependent_relations: dependent_relations.into_iter().collect_vec(),
302 read_storage_tables,
303 })
304}
305
306fn must_run_in_distributed_mode(stmt: &Statement) -> Result<bool> {
307 fn is_insert_using_select(stmt: &Statement) -> bool {
308 fn has_select_query(set_expr: &SetExpr) -> bool {
309 match set_expr {
310 SetExpr::Select(_) => true,
311 SetExpr::Query(query) => has_select_query(&query.body),
312 SetExpr::SetOperation { left, right, .. } => {
313 has_select_query(left) || has_select_query(right)
314 }
315 SetExpr::Values(_) => false,
316 }
317 }
318
319 matches!(
320 stmt,
321 Statement::Insert {source, ..} if has_select_query(&source.body)
322 )
323 }
324
325 let stmt_type = StatementType::infer_from_statement(stmt)
326 .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
327
328 Ok(matches!(
329 stmt_type,
330 StatementType::UPDATE
331 | StatementType::DELETE
332 | StatementType::UPDATE_RETURNING
333 | StatementType::DELETE_RETURNING
334 ) | is_insert_using_select(stmt))
335}
336
337fn must_run_in_local_mode(batch_plan: PlanRef) -> bool {
338 SysTableVisitor::has_sys_table(batch_plan)
339}
340
341fn determine_query_mode(batch_plan: PlanRef) -> QueryMode {
342 if ExecutionModeDecider::run_in_local_mode(batch_plan) {
343 QueryMode::Local
344 } else {
345 QueryMode::Distributed
346 }
347}
348
349pub struct BatchPlanFragmenterResult {
350 pub(crate) plan_fragmenter: BatchPlanFragmenter,
351 pub(crate) query_mode: QueryMode,
352 pub(crate) schema: Schema,
353 pub(crate) stmt_type: StatementType,
354 pub(crate) read_storage_tables: HashSet<TableId>,
355}
356
357pub fn gen_batch_plan_fragmenter(
358 session: &SessionImpl,
359 plan_result: BatchQueryPlanResult,
360) -> Result<BatchPlanFragmenterResult> {
361 let BatchQueryPlanResult {
362 plan,
363 query_mode,
364 schema,
365 stmt_type,
366 read_storage_tables,
367 ..
368 } = plan_result;
369
370 tracing::trace!(
371 "Generated query plan: {:?}, query_mode:{:?}",
372 plan.explain_to_string(),
373 query_mode
374 );
375 let worker_node_manager_reader = WorkerNodeSelector::new(
376 session.env().worker_node_manager_ref(),
377 session.is_barrier_read(),
378 );
379 let plan_fragmenter = BatchPlanFragmenter::new(
380 worker_node_manager_reader,
381 session.env().catalog_reader().clone(),
382 session.config().batch_parallelism().0,
383 session.config().timezone().to_owned(),
384 plan,
385 )?;
386
387 Ok(BatchPlanFragmenterResult {
388 plan_fragmenter,
389 query_mode,
390 schema,
391 stmt_type,
392 read_storage_tables,
393 })
394}
395
396pub async fn create_stream(
397 session: Arc<SessionImpl>,
398 plan_fragmenter_result: BatchPlanFragmenterResult,
399 formats: Vec<Format>,
400) -> Result<(PgResponseStream, Vec<PgFieldDescriptor>)> {
401 let BatchPlanFragmenterResult {
402 plan_fragmenter,
403 query_mode,
404 schema,
405 stmt_type,
406 read_storage_tables,
407 } = plan_fragmenter_result;
408
409 let mut can_timeout_cancel = true;
410 match stmt_type {
412 StatementType::INSERT
413 | StatementType::INSERT_RETURNING
414 | StatementType::DELETE
415 | StatementType::DELETE_RETURNING
416 | StatementType::UPDATE
417 | StatementType::UPDATE_RETURNING => {
418 session.txn_write_guard()?;
419 can_timeout_cancel = false;
420 }
421 _ => {}
422 }
423
424 let query = plan_fragmenter.generate_complete_query().await?;
425 tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
426
427 let pg_descs = schema
428 .fields()
429 .iter()
430 .map(to_pg_field)
431 .collect::<Vec<PgFieldDescriptor>>();
432 let column_types = schema.fields().iter().map(|f| f.data_type()).collect_vec();
433
434 let row_stream = match query_mode {
435 QueryMode::Auto => unreachable!(),
436 QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
437 local_execute(
438 session.clone(),
439 query,
440 can_timeout_cancel,
441 &read_storage_tables,
442 )
443 .await?,
444 column_types,
445 formats,
446 session.clone(),
447 )),
448 QueryMode::Distributed => {
450 PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
451 distribute_execute(
452 session.clone(),
453 query,
454 can_timeout_cancel,
455 read_storage_tables,
456 )
457 .await?,
458 column_types,
459 formats,
460 session.clone(),
461 ))
462 }
463 };
464
465 Ok((row_stream, pg_descs))
466}
467
468async fn execute(
469 session: Arc<SessionImpl>,
470 plan_fragmenter_result: BatchPlanFragmenterResult,
471 formats: Vec<Format>,
472) -> Result<RwPgResponse> {
473 let first_field_format = formats.first().copied().unwrap_or(Format::Text);
475 let query_mode = plan_fragmenter_result.query_mode;
476 let stmt_type = plan_fragmenter_result.stmt_type;
477
478 let query_start_time = Instant::now();
479 let (row_stream, pg_descs) =
480 create_stream(session.clone(), plan_fragmenter_result, formats).await?;
481
482 let callback = async move {
485 if session.config().implicit_flush() && stmt_type.is_dml() {
487 do_flush(&session).await?;
488 }
489
490 match query_mode {
492 QueryMode::Auto => unreachable!(),
493 QueryMode::Local => {
494 session
495 .env()
496 .frontend_metrics
497 .latency_local_execution
498 .observe(query_start_time.elapsed().as_secs_f64());
499
500 session
501 .env()
502 .frontend_metrics
503 .query_counter_local_execution
504 .inc();
505 }
506 QueryMode::Distributed => {
507 session
508 .env()
509 .query_manager()
510 .query_metrics
511 .query_latency
512 .observe(query_start_time.elapsed().as_secs_f64());
513
514 session
515 .env()
516 .query_manager()
517 .query_metrics
518 .completed_query_counter
519 .inc();
520 }
521 }
522
523 Ok(())
524 };
525
526 Ok(PgResponse::builder(stmt_type)
527 .row_cnt_format_opt(Some(first_field_format))
528 .values(row_stream, pg_descs)
529 .callback(callback)
530 .into())
531}
532
533pub async fn distribute_execute(
534 session: Arc<SessionImpl>,
535 query: Query,
536 can_timeout_cancel: bool,
537 read_storage_tables: HashSet<TableId>,
538) -> Result<DistributedQueryStream> {
539 let timeout = if cfg!(madsim) {
540 None
541 } else if can_timeout_cancel {
542 Some(session.statement_timeout())
543 } else {
544 None
545 };
546 let execution_context: ExecutionContextRef =
547 ExecutionContext::new(session.clone(), timeout).into();
548 let query_manager = session.env().query_manager().clone();
549
550 query_manager
551 .schedule(execution_context, query, read_storage_tables)
552 .await
553 .map_err(|err| err.into())
554}
555
556pub async fn local_execute(
557 session: Arc<SessionImpl>,
558 query: Query,
559 can_timeout_cancel: bool,
560 read_storage_tables: &HashSet<TableId>,
561) -> Result<LocalQueryStream> {
562 let timeout = if cfg!(madsim) {
563 None
564 } else if can_timeout_cancel {
565 Some(session.statement_timeout())
566 } else {
567 None
568 };
569 let front_env = session.env();
570
571 let snapshot = session.pinned_snapshot();
572
573 let execution = LocalQueryExecution::new(
574 query,
575 front_env.clone(),
576 snapshot.support_barrier_read(),
577 snapshot.batch_query_epoch(read_storage_tables)?,
578 session,
579 timeout,
580 );
581
582 Ok(execution.stream_rows())
583}