1use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Instant;
18
19use itertools::Itertools;
20use pgwire::pg_field_descriptor::PgFieldDescriptor;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::types::Format;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{FunctionId, Schema};
26use risingwave_common::session_config::QueryMode;
27use risingwave_common::types::{DataType, Datum};
28use risingwave_sqlparser::ast::{SetExpr, Statement};
29
30use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
31use super::{PgResponseStream, RwPgResponse, create_mv, declare_cursor};
32use crate::binder::{Binder, BoundCreateView, BoundStatement};
33use crate::catalog::TableId;
34use crate::error::{ErrorCode, Result, RwError};
35use crate::handler::HandlerArgs;
36use crate::handler::flush::do_flush;
37use crate::handler::util::{DataChunkToRowSetAdapter, to_pg_field};
38use crate::optimizer::plan_node::{BatchPlanRef, Explain};
39use crate::optimizer::{
40 BatchPlanRoot, ExecutionModeDecider, OptimizerContext, OptimizerContextRef,
41 ReadStorageTableVisitor, RelationCollectorVisitor, SysTableVisitor,
42};
43use crate::planner::Planner;
44use crate::scheduler::plan_fragmenter::Query;
45use crate::scheduler::{
46 BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
47 LocalQueryExecution, LocalQueryStream,
48};
49use crate::session::SessionImpl;
50
51pub async fn handle_query(
52 handler_args: HandlerArgs,
53 stmt: Statement,
54 formats: Vec<Format>,
55) -> Result<RwPgResponse> {
56 let session = handler_args.session.clone();
57 let plan_fragmenter_result = {
58 let context = OptimizerContext::from_handler_args(handler_args);
59 let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
60 risingwave_expr::expr_context::TIME_ZONE::sync_scope(
62 session.config().timezone().to_owned(),
63 || gen_batch_plan_fragmenter(&session, plan_result),
64 )?
65 };
66 execute(session, plan_fragmenter_result, formats).await
67}
68
69pub fn handle_parse(
70 handler_args: HandlerArgs,
71 statement: Statement,
72 specific_param_types: Vec<Option<DataType>>,
73) -> Result<PrepareStatement> {
74 let session = handler_args.session;
75 let bound_result = gen_bound(&session, statement.clone(), specific_param_types)?;
76
77 Ok(PrepareStatement::Prepared(PreparedResult {
78 statement,
79 bound_result,
80 }))
81}
82
83pub async fn handle_execute(
85 handler_args: HandlerArgs,
86 portal: PortalResult,
87) -> Result<RwPgResponse> {
88 let PortalResult {
89 bound_result,
90 result_formats,
91 statement,
92 } = portal;
93 match statement {
94 Statement::Query(_)
95 | Statement::Insert { .. }
96 | Statement::Delete { .. }
97 | Statement::Update { .. } => {
98 let session = handler_args.session.clone();
100 let plan_fragmenter_result = {
101 let context = OptimizerContext::from_handler_args(handler_args);
102 let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
103 risingwave_expr::expr_context::TIME_ZONE::sync_scope(
105 session.config().timezone().to_owned(),
106 || gen_batch_plan_fragmenter(&session, plan_result),
107 )?
108 };
109 execute(session, plan_fragmenter_result, result_formats).await
110 }
111 Statement::CreateView { materialized, .. } if materialized => {
112 let BoundResult {
114 bound,
115 dependent_relations,
116 dependent_udfs,
117 ..
118 } = bound_result;
119 let create_mv = if let BoundStatement::CreateView(create_mv) = bound {
120 create_mv
121 } else {
122 unreachable!("expect a BoundStatement::CreateView")
123 };
124 let BoundCreateView {
125 or_replace,
126 materialized: _,
127 if_not_exists,
128 name,
129 columns,
130 query,
131 emit_mode,
132 with_options,
133 } = *create_mv;
134 if or_replace {
135 bail_not_implemented!("CREATE OR REPLACE VIEW");
136 }
137
138 let handler_args = HandlerArgs {
140 session: handler_args.session.clone(),
141 sql: handler_args.sql.clone(),
142 normalized_sql: handler_args.normalized_sql.clone(),
143 with_options: crate::WithOptions::try_from(with_options.as_slice())?,
144 };
145
146 create_mv::handle_create_mv_bound(
147 handler_args,
148 if_not_exists,
149 name,
150 *query,
151 dependent_relations,
152 dependent_udfs,
153 columns,
154 emit_mode,
155 )
156 .await
157 }
158 Statement::DeclareCursor { stmt } => match stmt.declare_cursor {
159 risingwave_sqlparser::ast::DeclareCursor::Query(_) => {
160 let session = handler_args.session.clone();
161 let plan_fragmenter_result = {
162 let context = OptimizerContext::from_handler_args(handler_args.clone());
163 let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
164 gen_batch_plan_fragmenter(&session, plan_result)?
165 };
166 declare_cursor::handle_bound_declare_query_cursor(
167 handler_args,
168 stmt.cursor_name,
169 plan_fragmenter_result,
170 )
171 .await
172 }
173 risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
174 declare_cursor::handle_declare_subscription_cursor(
175 handler_args,
176 sub_name,
177 stmt.cursor_name,
178 rw_timestamp,
179 )
180 .await
181 }
182 },
183 _ => unreachable!(),
184 }
185}
186
187pub fn gen_batch_plan_by_statement(
188 session: &SessionImpl,
189 context: OptimizerContextRef,
190 stmt: Statement,
191) -> Result<BatchQueryPlanResult> {
192 let bound_result = gen_bound(session, stmt, vec![])?;
193 gen_batch_query_plan(session, context, bound_result)
194}
195
196#[derive(Clone)]
197pub struct BoundResult {
198 pub(crate) stmt_type: StatementType,
199 pub(crate) must_dist: bool,
200 pub(crate) bound: BoundStatement,
201 pub(crate) param_types: Vec<DataType>,
202 pub(crate) parsed_params: Option<Vec<Datum>>,
203 pub(crate) dependent_relations: HashSet<TableId>,
204 pub(crate) dependent_udfs: HashSet<FunctionId>,
206}
207
208fn gen_bound(
209 session: &SessionImpl,
210 stmt: Statement,
211 specific_param_types: Vec<Option<DataType>>,
212) -> Result<BoundResult> {
213 let stmt_type = StatementType::infer_from_statement(&stmt)
214 .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
215 let must_dist = must_run_in_distributed_mode(&stmt)?;
216
217 let mut binder = Binder::new_with_param_types(session, specific_param_types);
218 let bound = binder.bind(stmt)?;
219
220 Ok(BoundResult {
221 stmt_type,
222 must_dist,
223 bound,
224 param_types: binder.export_param_types()?,
225 parsed_params: None,
226 dependent_relations: binder.included_relations().clone(),
227 dependent_udfs: binder.included_udfs().clone(),
228 })
229}
230
231pub struct BatchQueryPlanResult {
232 pub(crate) plan: BatchPlanRef,
233 pub(crate) query_mode: QueryMode,
234 pub(crate) schema: Schema,
235 pub(crate) stmt_type: StatementType,
236 pub(crate) dependent_relations: Vec<TableId>,
240 pub(crate) read_storage_tables: HashSet<TableId>,
241}
242
243fn gen_batch_query_plan(
244 session: &SessionImpl,
245 context: OptimizerContextRef,
246 bind_result: BoundResult,
247) -> Result<BatchQueryPlanResult> {
248 let BoundResult {
249 stmt_type,
250 must_dist,
251 bound,
252 dependent_relations,
253 ..
254 } = bind_result;
255
256 let mut planner = if matches!(bound, BoundStatement::Query(_)) {
257 Planner::new_for_batch_dql(context)
258 } else {
259 Planner::new_for_batch(context)
260 };
261
262 let logical = planner.plan(bound)?;
263 let schema = logical.schema();
264 let batch_plan = logical.gen_batch_plan()?;
265
266 let dependent_relations =
267 RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.plan.clone());
268
269 let read_storage_tables = ReadStorageTableVisitor::collect(&batch_plan);
270
271 let must_local = must_run_in_local_mode(&batch_plan);
272
273 let query_mode = match (must_dist, must_local) {
274 (true, true) => {
275 return Err(ErrorCode::InternalError(
276 "the query is forced to both local and distributed mode by optimizer".to_owned(),
277 )
278 .into());
279 }
280 (true, false) => QueryMode::Distributed,
281 (false, true) => QueryMode::Local,
282 (false, false) => match session.config().query_mode() {
283 QueryMode::Auto => determine_query_mode(&batch_plan),
284 QueryMode::Local => QueryMode::Local,
285 QueryMode::Distributed => QueryMode::Distributed,
286 },
287 };
288
289 let physical = match query_mode {
290 QueryMode::Auto => unreachable!(),
291 QueryMode::Local => batch_plan.gen_batch_local_plan()?,
292 QueryMode::Distributed => batch_plan.gen_batch_distributed_plan()?,
293 };
294
295 Ok(BatchQueryPlanResult {
296 plan: physical,
297 query_mode,
298 schema,
299 stmt_type,
300 dependent_relations: dependent_relations.into_iter().collect_vec(),
301 read_storage_tables,
302 })
303}
304
305fn must_run_in_distributed_mode(stmt: &Statement) -> Result<bool> {
306 fn is_insert_using_select(stmt: &Statement) -> bool {
307 fn has_select_query(set_expr: &SetExpr) -> bool {
308 match set_expr {
309 SetExpr::Select(_) => true,
310 SetExpr::Query(query) => has_select_query(&query.body),
311 SetExpr::SetOperation { left, right, .. } => {
312 has_select_query(left) || has_select_query(right)
313 }
314 SetExpr::Values(_) => false,
315 }
316 }
317
318 matches!(
319 stmt,
320 Statement::Insert {source, ..} if has_select_query(&source.body)
321 )
322 }
323
324 let stmt_type = StatementType::infer_from_statement(stmt)
325 .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
326
327 Ok(matches!(
328 stmt_type,
329 StatementType::UPDATE
330 | StatementType::DELETE
331 | StatementType::UPDATE_RETURNING
332 | StatementType::DELETE_RETURNING
333 ) | is_insert_using_select(stmt))
334}
335
336fn must_run_in_local_mode(batch_plan: &BatchPlanRoot) -> bool {
337 SysTableVisitor::has_sys_table(batch_plan)
338}
339
340fn determine_query_mode(batch_plan: &BatchPlanRoot) -> QueryMode {
341 if ExecutionModeDecider::run_in_local_mode(batch_plan) {
342 QueryMode::Local
343 } else {
344 QueryMode::Distributed
345 }
346}
347
348pub struct BatchPlanFragmenterResult {
349 pub(crate) plan_fragmenter: BatchPlanFragmenter,
350 pub(crate) query_mode: QueryMode,
351 pub(crate) schema: Schema,
352 pub(crate) stmt_type: StatementType,
353 pub(crate) read_storage_tables: HashSet<TableId>,
354}
355
356pub fn gen_batch_plan_fragmenter(
357 session: &SessionImpl,
358 plan_result: BatchQueryPlanResult,
359) -> Result<BatchPlanFragmenterResult> {
360 let BatchQueryPlanResult {
361 plan,
362 query_mode,
363 schema,
364 stmt_type,
365 read_storage_tables,
366 ..
367 } = plan_result;
368
369 tracing::trace!(
370 "Generated query plan: {:?}, query_mode:{:?}",
371 plan.explain_to_string(),
372 query_mode
373 );
374 let worker_node_manager_reader = WorkerNodeSelector::new(
375 session.env().worker_node_manager_ref(),
376 session.is_barrier_read(),
377 );
378 let plan_fragmenter = BatchPlanFragmenter::new(
379 worker_node_manager_reader,
380 session.env().catalog_reader().clone(),
381 session.config().batch_parallelism().0,
382 session.config().timezone().to_owned(),
383 plan,
384 )?;
385
386 Ok(BatchPlanFragmenterResult {
387 plan_fragmenter,
388 query_mode,
389 schema,
390 stmt_type,
391 read_storage_tables,
392 })
393}
394
395pub async fn create_stream(
396 session: Arc<SessionImpl>,
397 plan_fragmenter_result: BatchPlanFragmenterResult,
398 formats: Vec<Format>,
399) -> Result<(PgResponseStream, Vec<PgFieldDescriptor>)> {
400 let BatchPlanFragmenterResult {
401 plan_fragmenter,
402 query_mode,
403 schema,
404 stmt_type,
405 read_storage_tables,
406 } = plan_fragmenter_result;
407
408 let mut can_timeout_cancel = true;
409 match stmt_type {
411 StatementType::INSERT
412 | StatementType::INSERT_RETURNING
413 | StatementType::DELETE
414 | StatementType::DELETE_RETURNING
415 | StatementType::UPDATE
416 | StatementType::UPDATE_RETURNING => {
417 session.txn_write_guard()?;
418 can_timeout_cancel = false;
419 }
420 _ => {}
421 }
422
423 let query = plan_fragmenter.generate_complete_query().await?;
424 tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
425
426 let pg_descs = schema
427 .fields()
428 .iter()
429 .map(to_pg_field)
430 .collect::<Vec<PgFieldDescriptor>>();
431 let column_types = schema.fields().iter().map(|f| f.data_type()).collect_vec();
432
433 let row_stream = match query_mode {
434 QueryMode::Auto => unreachable!(),
435 QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
436 local_execute(
437 session.clone(),
438 query,
439 can_timeout_cancel,
440 &read_storage_tables,
441 )
442 .await?,
443 column_types,
444 formats,
445 session.clone(),
446 )),
447 QueryMode::Distributed => {
449 PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
450 distribute_execute(
451 session.clone(),
452 query,
453 can_timeout_cancel,
454 read_storage_tables,
455 )
456 .await?,
457 column_types,
458 formats,
459 session.clone(),
460 ))
461 }
462 };
463
464 Ok((row_stream, pg_descs))
465}
466
467async fn execute(
468 session: Arc<SessionImpl>,
469 plan_fragmenter_result: BatchPlanFragmenterResult,
470 formats: Vec<Format>,
471) -> Result<RwPgResponse> {
472 let first_field_format = formats.first().copied().unwrap_or(Format::Text);
474 let query_mode = plan_fragmenter_result.query_mode;
475 let stmt_type = plan_fragmenter_result.stmt_type;
476
477 let query_start_time = Instant::now();
478 let (row_stream, pg_descs) =
479 create_stream(session.clone(), plan_fragmenter_result, formats).await?;
480
481 let callback = async move {
484 if session.config().implicit_flush() && stmt_type.is_dml() {
486 do_flush(&session).await?;
487 }
488
489 match query_mode {
491 QueryMode::Auto => unreachable!(),
492 QueryMode::Local => {
493 session
494 .env()
495 .frontend_metrics
496 .latency_local_execution
497 .observe(query_start_time.elapsed().as_secs_f64());
498
499 session
500 .env()
501 .frontend_metrics
502 .query_counter_local_execution
503 .inc();
504 }
505 QueryMode::Distributed => {
506 session
507 .env()
508 .query_manager()
509 .query_metrics
510 .query_latency
511 .observe(query_start_time.elapsed().as_secs_f64());
512
513 session
514 .env()
515 .query_manager()
516 .query_metrics
517 .completed_query_counter
518 .inc();
519 }
520 }
521
522 Ok(())
523 };
524
525 Ok(PgResponse::builder(stmt_type)
526 .row_cnt_format_opt(Some(first_field_format))
527 .values(row_stream, pg_descs)
528 .callback(callback)
529 .into())
530}
531
532pub async fn distribute_execute(
533 session: Arc<SessionImpl>,
534 query: Query,
535 can_timeout_cancel: bool,
536 read_storage_tables: HashSet<TableId>,
537) -> Result<DistributedQueryStream> {
538 let timeout = if cfg!(madsim) {
539 None
540 } else if can_timeout_cancel {
541 Some(session.statement_timeout())
542 } else {
543 None
544 };
545 let execution_context: ExecutionContextRef =
546 ExecutionContext::new(session.clone(), timeout).into();
547 let query_manager = session.env().query_manager().clone();
548
549 query_manager
550 .schedule(execution_context, query, read_storage_tables)
551 .await
552 .map_err(|err| err.into())
553}
554
555pub async fn local_execute(
556 session: Arc<SessionImpl>,
557 query: Query,
558 can_timeout_cancel: bool,
559 read_storage_tables: &HashSet<TableId>,
560) -> Result<LocalQueryStream> {
561 let timeout = if cfg!(madsim) {
562 None
563 } else if can_timeout_cancel {
564 Some(session.statement_timeout())
565 } else {
566 None
567 };
568 let front_env = session.env();
569
570 let snapshot = session.pinned_snapshot();
571
572 let execution = LocalQueryExecution::new(
573 query,
574 front_env.clone(),
575 snapshot.support_barrier_read(),
576 snapshot.batch_query_epoch(read_storage_tables)?,
577 session,
578 timeout,
579 );
580
581 Ok(execution.stream_rows())
582}