1use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Instant;
18
19use itertools::Itertools;
20use pgwire::pg_field_descriptor::PgFieldDescriptor;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::types::Format;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{FunctionId, Schema, SecretId};
26use risingwave_common::id::ObjectId;
27use risingwave_common::session_config::QueryMode;
28use risingwave_common::types::{DataType, Datum};
29use risingwave_sqlparser::ast::{SetExpr, Statement};
30
31use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
32use super::{PgResponseStream, RwPgResponse, create_mv, declare_cursor};
33use crate::binder::{Binder, BoundCreateView, BoundStatement};
34#[cfg(feature = "datafusion")]
35use crate::datafusion::DfBatchQueryPlanResult;
36use crate::error::{ErrorCode, Result, RwError};
37use crate::handler::HandlerArgs;
38use crate::handler::flush::do_flush;
39use crate::handler::util::{DataChunkToRowSetAdapter, to_pg_field};
40use crate::optimizer::plan_node::{BatchPlanRef, Explain};
41use crate::optimizer::{
42 BatchPlanRoot, ExecutionModeDecider, OptimizerContext, OptimizerContextRef,
43 RelationCollectorVisitor, SysTableVisitor,
44};
45use crate::planner::Planner;
46use crate::scheduler::plan_fragmenter::Query;
47use crate::scheduler::{
48 BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
49 LocalQueryExecution, LocalQueryStream,
50};
51use crate::session::SessionImpl;
52
53pub enum BatchPlanChoice {
55 Rw(RwBatchQueryPlanResult),
56 #[cfg(feature = "datafusion")]
57 Df(DfBatchQueryPlanResult),
58}
59
60impl BatchPlanChoice {
61 pub fn unwrap_rw(self) -> Result<RwBatchQueryPlanResult> {
62 match self {
63 BatchPlanChoice::Rw(result) => Ok(result),
64 #[cfg(feature = "datafusion")]
65 BatchPlanChoice::Df { .. } => {
66 risingwave_common::bail!(
67 "Expected RisingWave plan in BatchPlanChoice, but got DataFusion plan"
68 )
69 }
70 }
71 }
72}
73
74pub async fn handle_query(
75 handler_args: HandlerArgs,
76 stmt: Statement,
77 formats: Vec<Format>,
78) -> Result<RwPgResponse> {
79 let session = handler_args.session.clone();
80 let context = OptimizerContext::from_handler_args(handler_args);
81
82 #[cfg(feature = "datafusion")]
83 {
84 use futures::FutureExt;
88
89 use crate::datafusion::execute_datafusion_plan;
90
91 let future = match gen_batch_plan_by_statement(&session, context.into(), stmt)? {
92 BatchPlanChoice::Rw(plan_result) => {
93 let plan_fragmenter_result = risingwave_expr::expr_context::TIME_ZONE::sync_scope(
94 session.config().timezone(),
95 || gen_batch_plan_fragmenter(&session, plan_result),
96 )?;
97 execute_risingwave_plan(session, plan_fragmenter_result, formats).left_future()
98 }
99 BatchPlanChoice::Df(plan_result) => {
100 execute_datafusion_plan(session, plan_result, formats).right_future()
101 }
102 };
103 future.await
104 }
105
106 #[cfg(not(feature = "datafusion"))]
107 {
108 let future = match gen_batch_plan_by_statement(&session, context.into(), stmt)? {
109 BatchPlanChoice::Rw(plan_result) => {
110 let plan_fragmenter_result = risingwave_expr::expr_context::TIME_ZONE::sync_scope(
111 session.config().timezone(),
112 || gen_batch_plan_fragmenter(&session, plan_result),
113 )?;
114 execute_risingwave_plan(session, plan_fragmenter_result, formats)
115 }
116 };
117 future.await
118 }
119}
120
121fn handle_parse_inner(binder: Binder, statement: Statement) -> Result<PrepareStatement> {
122 let bound_result = gen_bound(binder, statement.clone())?;
123
124 Ok(PrepareStatement::Prepared(PreparedResult {
125 statement,
126 bound_result,
127 }))
128}
129
130pub fn handle_parse_for_batch(
131 handler_args: HandlerArgs,
132 statement: Statement,
133 specified_param_types: Vec<Option<DataType>>,
134) -> Result<PrepareStatement> {
135 let binder = Binder::new_for_batch(&handler_args.session)
136 .with_specified_params_types(specified_param_types);
137 handle_parse_inner(binder, statement)
138}
139
140pub fn handle_parse_for_stream(
141 handler_args: HandlerArgs,
142 statement: Statement,
143 specified_param_types: Vec<Option<DataType>>,
144) -> Result<PrepareStatement> {
145 let binder = Binder::new_for_stream(&handler_args.session)
146 .with_specified_params_types(specified_param_types);
147 handle_parse_inner(binder, statement)
148}
149
150pub async fn handle_execute(
152 handler_args: HandlerArgs,
153 portal: PortalResult,
154) -> Result<RwPgResponse> {
155 let PortalResult {
156 bound_result,
157 result_formats,
158 statement,
159 } = portal;
160 match statement {
161 Statement::Query(_)
162 | Statement::Insert { .. }
163 | Statement::Delete { .. }
164 | Statement::Update { .. } => {
165 let session = handler_args.session.clone();
167 let plan_fragmenter_result = {
168 let context = OptimizerContext::from_handler_args(handler_args);
169 let plan_result =
170 gen_batch_query_plan(&session, context.into(), bound_result)?.unwrap_rw()?;
171 risingwave_expr::expr_context::TIME_ZONE::sync_scope(
173 session.config().timezone(),
174 || gen_batch_plan_fragmenter(&session, plan_result),
175 )?
176 };
177 execute_risingwave_plan(session, plan_fragmenter_result, result_formats).await
178 }
179 Statement::CreateView { materialized, .. } if materialized => {
180 let BoundResult {
182 bound,
183 dependent_relations,
184 dependent_udfs,
185 dependent_secrets,
186 ..
187 } = bound_result;
188 let create_mv = if let BoundStatement::CreateView(create_mv) = bound {
189 create_mv
190 } else {
191 unreachable!("expect a BoundStatement::CreateView")
192 };
193 let BoundCreateView {
194 or_replace,
195 materialized: _,
196 if_not_exists,
197 name,
198 columns,
199 query,
200 emit_mode,
201 with_options,
202 } = *create_mv;
203 if or_replace {
204 bail_not_implemented!("CREATE OR REPLACE VIEW");
205 }
206
207 let handler_args = HandlerArgs {
209 session: handler_args.session.clone(),
210 sql: handler_args.sql.clone(),
211 normalized_sql: handler_args.normalized_sql.clone(),
212 with_options: crate::WithOptions::try_from(with_options.as_slice())?,
213 };
214
215 create_mv::handle_create_mv_bound(
216 handler_args,
217 if_not_exists,
218 name,
219 *query,
220 dependent_relations,
221 dependent_udfs,
222 dependent_secrets,
223 columns,
224 emit_mode,
225 )
226 .await
227 }
228 Statement::DeclareCursor { stmt } => match stmt.declare_cursor {
229 risingwave_sqlparser::ast::DeclareCursor::Query(_) => {
230 let session = handler_args.session.clone();
231 let plan_fragmenter_result = {
232 let context = OptimizerContext::from_handler_args(handler_args.clone());
233 let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?
234 .unwrap_rw()?;
235 gen_batch_plan_fragmenter(&session, plan_result)?
236 };
237 declare_cursor::handle_bound_declare_query_cursor(
238 handler_args,
239 stmt.cursor_name,
240 plan_fragmenter_result,
241 )
242 .await
243 }
244 risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
245 declare_cursor::handle_declare_subscription_cursor(
246 handler_args,
247 sub_name,
248 stmt.cursor_name,
249 rw_timestamp,
250 )
251 .await
252 }
253 },
254 _ => unreachable!(),
255 }
256}
257
258pub fn gen_batch_plan_by_statement(
259 session: &SessionImpl,
260 context: OptimizerContextRef,
261 stmt: Statement,
262) -> Result<BatchPlanChoice> {
263 let binder = Binder::new_for_batch(session);
264 let bound_result = gen_bound(binder, stmt)?;
265 gen_batch_query_plan(session, context, bound_result)
266}
267
268#[derive(Clone)]
269pub struct BoundResult {
270 pub(crate) stmt_type: StatementType,
271 pub(crate) must_dist: bool,
272 pub(crate) bound: BoundStatement,
273 pub(crate) param_types: Vec<DataType>,
274 pub(crate) parsed_params: Option<Vec<Datum>>,
275 pub(crate) dependent_relations: HashSet<ObjectId>,
276 pub(crate) dependent_udfs: HashSet<FunctionId>,
278 pub(crate) dependent_secrets: HashSet<SecretId>,
279}
280
281fn gen_bound(mut binder: Binder, stmt: Statement) -> Result<BoundResult> {
282 let stmt_type = StatementType::infer_from_statement(&stmt)
283 .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
284 let must_dist = must_run_in_distributed_mode(&stmt)?;
285
286 let bound = binder.bind(stmt)?;
287
288 Ok(BoundResult {
289 stmt_type,
290 must_dist,
291 bound,
292 param_types: binder.export_param_types()?,
293 parsed_params: None,
294 dependent_relations: binder.included_relations().clone(),
295 dependent_udfs: binder.included_udfs().clone(),
296 dependent_secrets: binder.included_secrets().clone(),
297 })
298}
299
300pub struct RwBatchQueryPlanResult {
301 pub(crate) plan: BatchPlanRef,
302 pub(crate) query_mode: QueryMode,
303 pub(crate) schema: Schema,
304 pub(crate) stmt_type: StatementType,
305 pub(crate) dependent_relations: Vec<ObjectId>,
309 pub(crate) dependent_secrets: Vec<SecretId>,
310}
311
312fn gen_batch_query_plan(
313 session: &SessionImpl,
314 context: OptimizerContextRef,
315 bind_result: BoundResult,
316) -> Result<BatchPlanChoice> {
317 let BoundResult {
318 stmt_type,
319 must_dist,
320 bound,
321 dependent_relations,
322 dependent_secrets,
323 ..
324 } = bind_result;
325
326 let mut planner = if matches!(bound, BoundStatement::Query(_)) {
327 Planner::new_for_batch_dql(context)
328 } else {
329 Planner::new_for_batch(context)
330 };
331
332 let logical = planner.plan(bound)?;
333 let schema = logical.schema();
334 let optimized_logical = logical.gen_optimized_logical_plan_for_batch()?;
335
336 #[cfg(feature = "datafusion")]
337 {
338 if session.config().enable_datafusion_engine() {
339 use thiserror_ext::AsReport;
340
341 use crate::datafusion::{GenDataFusionPlanError, try_gen_datafusion_plan};
342
343 match try_gen_datafusion_plan(&optimized_logical) {
344 Ok(plan) => {
345 return Ok(BatchPlanChoice::Df(DfBatchQueryPlanResult {
346 plan,
347 schema,
348 stmt_type,
349 }));
350 }
351 Err(GenDataFusionPlanError::MissingIcebergScan) => {}
352 Err(err) => {
353 tracing::warn!(
354 "Failed to generate DataFusion plan, fallback to RisingWave plan: {}",
355 err.as_report()
356 );
357 }
358 }
359 }
360 }
361
362 let batch_plan = optimized_logical.gen_batch_plan()?;
363
364 let dependent_relations =
365 RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.plan.clone());
366
367 let must_local = must_run_in_local_mode(&batch_plan);
368
369 let query_mode = match (must_dist, must_local) {
370 (true, true) => {
371 return Err(ErrorCode::InternalError(
372 "the query is forced to both local and distributed mode by optimizer".to_owned(),
373 )
374 .into());
375 }
376 (true, false) => QueryMode::Distributed,
377 (false, true) => QueryMode::Local,
378 (false, false) => match session.config().query_mode() {
379 QueryMode::Auto => determine_query_mode(&batch_plan),
380 QueryMode::Local => QueryMode::Local,
381 QueryMode::Distributed => QueryMode::Distributed,
382 },
383 };
384
385 let physical = match query_mode {
386 QueryMode::Auto => unreachable!(),
387 QueryMode::Local => batch_plan.gen_batch_local_plan()?,
388 QueryMode::Distributed => batch_plan.gen_batch_distributed_plan()?,
389 };
390
391 let result = RwBatchQueryPlanResult {
392 plan: physical,
393 query_mode,
394 schema,
395 stmt_type,
396 dependent_relations: dependent_relations.into_iter().collect_vec(),
397 dependent_secrets: dependent_secrets.into_iter().collect_vec(),
398 };
399 Ok(BatchPlanChoice::Rw(result))
400}
401
402fn must_run_in_distributed_mode(stmt: &Statement) -> Result<bool> {
403 fn is_insert_using_select(stmt: &Statement) -> bool {
404 fn has_select_query(set_expr: &SetExpr) -> bool {
405 match set_expr {
406 SetExpr::Select(_) => true,
407 SetExpr::Query(query) => has_select_query(&query.body),
408 SetExpr::SetOperation { left, right, .. } => {
409 has_select_query(left) || has_select_query(right)
410 }
411 SetExpr::Values(_) => false,
412 }
413 }
414
415 matches!(
416 stmt,
417 Statement::Insert {source, ..} if has_select_query(&source.body)
418 )
419 }
420
421 let stmt_type = StatementType::infer_from_statement(stmt)
422 .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
423
424 Ok(matches!(
425 stmt_type,
426 StatementType::UPDATE
427 | StatementType::DELETE
428 | StatementType::UPDATE_RETURNING
429 | StatementType::DELETE_RETURNING
430 ) | is_insert_using_select(stmt))
431}
432
433fn must_run_in_local_mode(batch_plan: &BatchPlanRoot) -> bool {
434 SysTableVisitor::has_sys_table(batch_plan)
435}
436
437fn determine_query_mode(batch_plan: &BatchPlanRoot) -> QueryMode {
438 if ExecutionModeDecider::run_in_local_mode(batch_plan) {
439 QueryMode::Local
440 } else {
441 QueryMode::Distributed
442 }
443}
444
445pub struct BatchPlanFragmenterResult {
446 pub(crate) plan_fragmenter: BatchPlanFragmenter,
447 pub(crate) query_mode: QueryMode,
448 pub(crate) schema: Schema,
449 pub(crate) stmt_type: StatementType,
450}
451
452pub fn gen_batch_plan_fragmenter(
453 session: &SessionImpl,
454 plan_result: RwBatchQueryPlanResult,
455) -> Result<BatchPlanFragmenterResult> {
456 let RwBatchQueryPlanResult {
457 plan,
458 query_mode,
459 schema,
460 stmt_type,
461 ..
462 } = plan_result;
463
464 tracing::trace!(
465 "Generated query plan: {:?}, query_mode:{:?}",
466 plan.explain_to_string(),
467 query_mode
468 );
469 let worker_node_manager_reader = WorkerNodeSelector::new(
470 session.env().worker_node_manager_ref(),
471 session.is_barrier_read(),
472 );
473 let plan_fragmenter = BatchPlanFragmenter::new(
474 worker_node_manager_reader,
475 session.env().catalog_reader().clone(),
476 session.config().batch_parallelism().0,
477 plan,
478 )?;
479
480 Ok(BatchPlanFragmenterResult {
481 plan_fragmenter,
482 query_mode,
483 schema,
484 stmt_type,
485 })
486}
487
488pub async fn create_stream(
489 session: Arc<SessionImpl>,
490 plan_fragmenter_result: BatchPlanFragmenterResult,
491 formats: Vec<Format>,
492) -> Result<(PgResponseStream, Vec<PgFieldDescriptor>)> {
493 let BatchPlanFragmenterResult {
494 plan_fragmenter,
495 query_mode,
496 schema,
497 stmt_type,
498 ..
499 } = plan_fragmenter_result;
500
501 let mut can_timeout_cancel = true;
502 match stmt_type {
504 StatementType::INSERT
505 | StatementType::INSERT_RETURNING
506 | StatementType::DELETE
507 | StatementType::DELETE_RETURNING
508 | StatementType::UPDATE
509 | StatementType::UPDATE_RETURNING => {
510 session.txn_write_guard()?;
511 can_timeout_cancel = false;
512 }
513 _ => {}
514 }
515
516 let query = plan_fragmenter.generate_complete_query().await?;
517 tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
518
519 let pg_descs = schema
520 .fields()
521 .iter()
522 .map(to_pg_field)
523 .collect::<Vec<PgFieldDescriptor>>();
524 let column_types = schema.fields().iter().map(|f| f.data_type()).collect_vec();
525
526 let row_stream = match query_mode {
527 QueryMode::Auto => unreachable!(),
528 QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
529 local_execute(session.clone(), query, can_timeout_cancel).await?,
530 column_types,
531 formats,
532 session.clone(),
533 )),
534 QueryMode::Distributed => {
536 PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
537 distribute_execute(session.clone(), query, can_timeout_cancel).await?,
538 column_types,
539 formats,
540 session.clone(),
541 ))
542 }
543 };
544
545 Ok((row_stream, pg_descs))
546}
547
548async fn execute_risingwave_plan(
549 session: Arc<SessionImpl>,
550 plan_fragmenter_result: BatchPlanFragmenterResult,
551 formats: Vec<Format>,
552) -> Result<RwPgResponse> {
553 let first_field_format = formats.first().copied().unwrap_or(Format::Text);
555 let query_mode = plan_fragmenter_result.query_mode;
556 let stmt_type = plan_fragmenter_result.stmt_type;
557
558 let query_start_time = Instant::now();
559 let (row_stream, pg_descs) =
560 create_stream(session.clone(), plan_fragmenter_result, formats).await?;
561
562 let callback = async move {
565 if session.config().implicit_flush() && stmt_type.is_dml() {
567 do_flush(&session).await?;
568 }
569
570 match query_mode {
572 QueryMode::Auto => unreachable!(),
573 QueryMode::Local => {
574 session
575 .env()
576 .frontend_metrics
577 .latency_local_execution
578 .observe(query_start_time.elapsed().as_secs_f64());
579
580 session
581 .env()
582 .frontend_metrics
583 .query_counter_local_execution
584 .inc();
585 }
586 QueryMode::Distributed => {
587 session
588 .env()
589 .query_manager()
590 .query_metrics
591 .query_latency
592 .observe(query_start_time.elapsed().as_secs_f64());
593
594 session
595 .env()
596 .query_manager()
597 .query_metrics
598 .completed_query_counter
599 .inc();
600 }
601 }
602
603 Ok(())
604 };
605
606 Ok(PgResponse::builder(stmt_type)
607 .row_cnt_format_opt(Some(first_field_format))
608 .values(row_stream, pg_descs)
609 .callback(callback)
610 .into())
611}
612
613pub async fn distribute_execute(
614 session: Arc<SessionImpl>,
615 query: Query,
616 can_timeout_cancel: bool,
617) -> Result<DistributedQueryStream> {
618 let timeout = if cfg!(madsim) {
619 None
620 } else if can_timeout_cancel {
621 Some(session.statement_timeout())
622 } else {
623 None
624 };
625 let execution_context: ExecutionContextRef =
626 ExecutionContext::new(session.clone(), timeout).into();
627 let query_manager = session.env().query_manager().clone();
628
629 query_manager
630 .schedule(execution_context, query)
631 .await
632 .map_err(|err| err.into())
633}
634
635pub async fn local_execute(
636 session: Arc<SessionImpl>,
637 mut query: Query,
638 can_timeout_cancel: bool,
639) -> Result<LocalQueryStream> {
640 let timeout = if cfg!(madsim) {
641 None
642 } else if can_timeout_cancel {
643 Some(session.statement_timeout())
644 } else {
645 None
646 };
647 let front_env = session.env();
648
649 let snapshot = session.pinned_snapshot();
650
651 snapshot.fill_batch_query_epoch(&mut query)?;
652
653 let execution = LocalQueryExecution::new(
654 query,
655 front_env.clone(),
656 snapshot.support_barrier_read(),
657 session,
658 timeout,
659 );
660
661 Ok(execution.stream_rows())
662}