1use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Instant;
18
19use itertools::Itertools;
20use pgwire::pg_field_descriptor::PgFieldDescriptor;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::types::Format;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{FunctionId, Schema};
26use risingwave_common::id::ObjectId;
27use risingwave_common::session_config::QueryMode;
28use risingwave_common::types::{DataType, Datum};
29use risingwave_sqlparser::ast::{SetExpr, Statement};
30
31use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
32use super::{PgResponseStream, RwPgResponse, create_mv, declare_cursor};
33use crate::binder::{Binder, BoundCreateView, BoundStatement};
34#[cfg(feature = "datafusion")]
35use crate::datafusion::DfBatchQueryPlanResult;
36use crate::error::{ErrorCode, Result, RwError};
37use crate::handler::HandlerArgs;
38use crate::handler::flush::do_flush;
39use crate::handler::util::{DataChunkToRowSetAdapter, to_pg_field};
40use crate::optimizer::plan_node::{BatchPlanRef, Explain};
41use crate::optimizer::{
42 BatchPlanRoot, ExecutionModeDecider, OptimizerContext, OptimizerContextRef,
43 RelationCollectorVisitor, SysTableVisitor,
44};
45use crate::planner::Planner;
46use crate::scheduler::plan_fragmenter::Query;
47use crate::scheduler::{
48 BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
49 LocalQueryExecution, LocalQueryStream,
50};
51use crate::session::SessionImpl;
52
53pub enum BatchPlanChoice {
55 Rw(RwBatchQueryPlanResult),
56 #[cfg(feature = "datafusion")]
57 Df(DfBatchQueryPlanResult),
58}
59
60impl BatchPlanChoice {
61 pub fn unwrap_rw(self) -> Result<RwBatchQueryPlanResult> {
62 match self {
63 BatchPlanChoice::Rw(result) => Ok(result),
64 #[cfg(feature = "datafusion")]
65 BatchPlanChoice::Df { .. } => {
66 risingwave_common::bail!(
67 "Expected RisingWave plan in BatchPlanChoice, but got DataFusion plan"
68 )
69 }
70 }
71 }
72}
73
74pub async fn handle_query(
75 handler_args: HandlerArgs,
76 stmt: Statement,
77 formats: Vec<Format>,
78) -> Result<RwPgResponse> {
79 let session = handler_args.session.clone();
80 let context = OptimizerContext::from_handler_args(handler_args);
81
82 #[cfg(feature = "datafusion")]
83 {
84 use futures::FutureExt;
88
89 use crate::datafusion::execute_datafusion_plan;
90
91 let future = match gen_batch_plan_by_statement(&session, context.into(), stmt)? {
92 BatchPlanChoice::Rw(plan_result) => {
93 let plan_fragmenter_result = risingwave_expr::expr_context::TIME_ZONE::sync_scope(
94 session.config().timezone(),
95 || gen_batch_plan_fragmenter(&session, plan_result),
96 )?;
97 execute_risingwave_plan(session, plan_fragmenter_result, formats).left_future()
98 }
99 BatchPlanChoice::Df(plan_result) => {
100 execute_datafusion_plan(session, plan_result, formats).right_future()
101 }
102 };
103 future.await
104 }
105
106 #[cfg(not(feature = "datafusion"))]
107 {
108 let future = match gen_batch_plan_by_statement(&session, context.into(), stmt)? {
109 BatchPlanChoice::Rw(plan_result) => {
110 let plan_fragmenter_result = risingwave_expr::expr_context::TIME_ZONE::sync_scope(
111 session.config().timezone(),
112 || gen_batch_plan_fragmenter(&session, plan_result),
113 )?;
114 execute_risingwave_plan(session, plan_fragmenter_result, formats)
115 }
116 };
117 future.await
118 }
119}
120
121fn handle_parse_inner(binder: Binder, statement: Statement) -> Result<PrepareStatement> {
122 let bound_result = gen_bound(binder, statement.clone())?;
123
124 Ok(PrepareStatement::Prepared(PreparedResult {
125 statement,
126 bound_result,
127 }))
128}
129
130pub fn handle_parse_for_batch(
131 handler_args: HandlerArgs,
132 statement: Statement,
133 specified_param_types: Vec<Option<DataType>>,
134) -> Result<PrepareStatement> {
135 let binder = Binder::new_for_batch(&handler_args.session)
136 .with_specified_params_types(specified_param_types);
137 handle_parse_inner(binder, statement)
138}
139
140pub fn handle_parse_for_stream(
141 handler_args: HandlerArgs,
142 statement: Statement,
143 specified_param_types: Vec<Option<DataType>>,
144) -> Result<PrepareStatement> {
145 let binder = Binder::new_for_stream(&handler_args.session)
146 .with_specified_params_types(specified_param_types);
147 handle_parse_inner(binder, statement)
148}
149
150pub async fn handle_execute(
152 handler_args: HandlerArgs,
153 portal: PortalResult,
154) -> Result<RwPgResponse> {
155 let PortalResult {
156 bound_result,
157 result_formats,
158 statement,
159 } = portal;
160 match statement {
161 Statement::Query(_)
162 | Statement::Insert { .. }
163 | Statement::Delete { .. }
164 | Statement::Update { .. } => {
165 let session = handler_args.session.clone();
167 let plan_fragmenter_result = {
168 let context = OptimizerContext::from_handler_args(handler_args);
169 let plan_result =
170 gen_batch_query_plan(&session, context.into(), bound_result)?.unwrap_rw()?;
171 risingwave_expr::expr_context::TIME_ZONE::sync_scope(
173 session.config().timezone(),
174 || gen_batch_plan_fragmenter(&session, plan_result),
175 )?
176 };
177 execute_risingwave_plan(session, plan_fragmenter_result, result_formats).await
178 }
179 Statement::CreateView { materialized, .. } if materialized => {
180 let BoundResult {
182 bound,
183 dependent_relations,
184 dependent_udfs,
185 ..
186 } = bound_result;
187 let create_mv = if let BoundStatement::CreateView(create_mv) = bound {
188 create_mv
189 } else {
190 unreachable!("expect a BoundStatement::CreateView")
191 };
192 let BoundCreateView {
193 or_replace,
194 materialized: _,
195 if_not_exists,
196 name,
197 columns,
198 query,
199 emit_mode,
200 with_options,
201 } = *create_mv;
202 if or_replace {
203 bail_not_implemented!("CREATE OR REPLACE VIEW");
204 }
205
206 let handler_args = HandlerArgs {
208 session: handler_args.session.clone(),
209 sql: handler_args.sql.clone(),
210 normalized_sql: handler_args.normalized_sql.clone(),
211 with_options: crate::WithOptions::try_from(with_options.as_slice())?,
212 };
213
214 create_mv::handle_create_mv_bound(
215 handler_args,
216 if_not_exists,
217 name,
218 *query,
219 dependent_relations,
220 dependent_udfs,
221 columns,
222 emit_mode,
223 )
224 .await
225 }
226 Statement::DeclareCursor { stmt } => match stmt.declare_cursor {
227 risingwave_sqlparser::ast::DeclareCursor::Query(_) => {
228 let session = handler_args.session.clone();
229 let plan_fragmenter_result = {
230 let context = OptimizerContext::from_handler_args(handler_args.clone());
231 let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?
232 .unwrap_rw()?;
233 gen_batch_plan_fragmenter(&session, plan_result)?
234 };
235 declare_cursor::handle_bound_declare_query_cursor(
236 handler_args,
237 stmt.cursor_name,
238 plan_fragmenter_result,
239 )
240 .await
241 }
242 risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
243 declare_cursor::handle_declare_subscription_cursor(
244 handler_args,
245 sub_name,
246 stmt.cursor_name,
247 rw_timestamp,
248 )
249 .await
250 }
251 },
252 _ => unreachable!(),
253 }
254}
255
256pub fn gen_batch_plan_by_statement(
257 session: &SessionImpl,
258 context: OptimizerContextRef,
259 stmt: Statement,
260) -> Result<BatchPlanChoice> {
261 let binder = Binder::new_for_batch(session);
262 let bound_result = gen_bound(binder, stmt)?;
263 gen_batch_query_plan(session, context, bound_result)
264}
265
266#[derive(Clone)]
267pub struct BoundResult {
268 pub(crate) stmt_type: StatementType,
269 pub(crate) must_dist: bool,
270 pub(crate) bound: BoundStatement,
271 pub(crate) param_types: Vec<DataType>,
272 pub(crate) parsed_params: Option<Vec<Datum>>,
273 pub(crate) dependent_relations: HashSet<ObjectId>,
274 pub(crate) dependent_udfs: HashSet<FunctionId>,
276}
277
278fn gen_bound(mut binder: Binder, stmt: Statement) -> Result<BoundResult> {
279 let stmt_type = StatementType::infer_from_statement(&stmt)
280 .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
281 let must_dist = must_run_in_distributed_mode(&stmt)?;
282
283 let bound = binder.bind(stmt)?;
284
285 Ok(BoundResult {
286 stmt_type,
287 must_dist,
288 bound,
289 param_types: binder.export_param_types()?,
290 parsed_params: None,
291 dependent_relations: binder.included_relations().clone(),
292 dependent_udfs: binder.included_udfs().clone(),
293 })
294}
295
296pub struct RwBatchQueryPlanResult {
297 pub(crate) plan: BatchPlanRef,
298 pub(crate) query_mode: QueryMode,
299 pub(crate) schema: Schema,
300 pub(crate) stmt_type: StatementType,
301 pub(crate) dependent_relations: Vec<ObjectId>,
305}
306
307fn gen_batch_query_plan(
308 session: &SessionImpl,
309 context: OptimizerContextRef,
310 bind_result: BoundResult,
311) -> Result<BatchPlanChoice> {
312 let BoundResult {
313 stmt_type,
314 must_dist,
315 bound,
316 dependent_relations,
317 ..
318 } = bind_result;
319
320 let mut planner = if matches!(bound, BoundStatement::Query(_)) {
321 Planner::new_for_batch_dql(context)
322 } else {
323 Planner::new_for_batch(context)
324 };
325
326 let logical = planner.plan(bound)?;
327 let schema = logical.schema();
328 let optimized_logical = logical.gen_optimized_logical_plan_for_batch()?;
329
330 #[cfg(feature = "datafusion")]
331 {
332 use crate::optimizer::DataFusionExecuteCheckerExt;
333
334 let execute_by_datafusion = if session.config().enable_datafusion_engine() {
335 let check_result = optimized_logical.plan.check_for_datafusion();
336 if !check_result.supported && check_result.have_iceberg_scan {
337 tracing::warn!(
338 "DataFusion execution disabled because of unsupported plan nodes in the logical plan. The performance may be degraded."
339 );
340 }
341 check_result.supported && check_result.have_iceberg_scan
342 } else {
343 false
344 };
345
346 if execute_by_datafusion {
347 let plan = optimized_logical.gen_datafusion_logical_plan()?;
348 return Ok(BatchPlanChoice::Df(DfBatchQueryPlanResult {
349 plan,
350 schema,
351 stmt_type,
352 }));
353 }
354 }
355
356 let batch_plan = optimized_logical.gen_batch_plan()?;
357
358 let dependent_relations =
359 RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.plan.clone());
360
361 let must_local = must_run_in_local_mode(&batch_plan);
362
363 let query_mode = match (must_dist, must_local) {
364 (true, true) => {
365 return Err(ErrorCode::InternalError(
366 "the query is forced to both local and distributed mode by optimizer".to_owned(),
367 )
368 .into());
369 }
370 (true, false) => QueryMode::Distributed,
371 (false, true) => QueryMode::Local,
372 (false, false) => match session.config().query_mode() {
373 QueryMode::Auto => determine_query_mode(&batch_plan),
374 QueryMode::Local => QueryMode::Local,
375 QueryMode::Distributed => QueryMode::Distributed,
376 },
377 };
378
379 let physical = match query_mode {
380 QueryMode::Auto => unreachable!(),
381 QueryMode::Local => batch_plan.gen_batch_local_plan()?,
382 QueryMode::Distributed => batch_plan.gen_batch_distributed_plan()?,
383 };
384
385 let result = RwBatchQueryPlanResult {
386 plan: physical,
387 query_mode,
388 schema,
389 stmt_type,
390 dependent_relations: dependent_relations.into_iter().collect_vec(),
391 };
392 Ok(BatchPlanChoice::Rw(result))
393}
394
395fn must_run_in_distributed_mode(stmt: &Statement) -> Result<bool> {
396 fn is_insert_using_select(stmt: &Statement) -> bool {
397 fn has_select_query(set_expr: &SetExpr) -> bool {
398 match set_expr {
399 SetExpr::Select(_) => true,
400 SetExpr::Query(query) => has_select_query(&query.body),
401 SetExpr::SetOperation { left, right, .. } => {
402 has_select_query(left) || has_select_query(right)
403 }
404 SetExpr::Values(_) => false,
405 }
406 }
407
408 matches!(
409 stmt,
410 Statement::Insert {source, ..} if has_select_query(&source.body)
411 )
412 }
413
414 let stmt_type = StatementType::infer_from_statement(stmt)
415 .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
416
417 Ok(matches!(
418 stmt_type,
419 StatementType::UPDATE
420 | StatementType::DELETE
421 | StatementType::UPDATE_RETURNING
422 | StatementType::DELETE_RETURNING
423 ) | is_insert_using_select(stmt))
424}
425
426fn must_run_in_local_mode(batch_plan: &BatchPlanRoot) -> bool {
427 SysTableVisitor::has_sys_table(batch_plan)
428}
429
430fn determine_query_mode(batch_plan: &BatchPlanRoot) -> QueryMode {
431 if ExecutionModeDecider::run_in_local_mode(batch_plan) {
432 QueryMode::Local
433 } else {
434 QueryMode::Distributed
435 }
436}
437
438pub struct BatchPlanFragmenterResult {
439 pub(crate) plan_fragmenter: BatchPlanFragmenter,
440 pub(crate) query_mode: QueryMode,
441 pub(crate) schema: Schema,
442 pub(crate) stmt_type: StatementType,
443}
444
445pub fn gen_batch_plan_fragmenter(
446 session: &SessionImpl,
447 plan_result: RwBatchQueryPlanResult,
448) -> Result<BatchPlanFragmenterResult> {
449 let RwBatchQueryPlanResult {
450 plan,
451 query_mode,
452 schema,
453 stmt_type,
454 ..
455 } = plan_result;
456
457 tracing::trace!(
458 "Generated query plan: {:?}, query_mode:{:?}",
459 plan.explain_to_string(),
460 query_mode
461 );
462 let worker_node_manager_reader = WorkerNodeSelector::new(
463 session.env().worker_node_manager_ref(),
464 session.is_barrier_read(),
465 );
466 let plan_fragmenter = BatchPlanFragmenter::new(
467 worker_node_manager_reader,
468 session.env().catalog_reader().clone(),
469 session.config().batch_parallelism().0,
470 plan,
471 )?;
472
473 Ok(BatchPlanFragmenterResult {
474 plan_fragmenter,
475 query_mode,
476 schema,
477 stmt_type,
478 })
479}
480
481pub async fn create_stream(
482 session: Arc<SessionImpl>,
483 plan_fragmenter_result: BatchPlanFragmenterResult,
484 formats: Vec<Format>,
485) -> Result<(PgResponseStream, Vec<PgFieldDescriptor>)> {
486 let BatchPlanFragmenterResult {
487 plan_fragmenter,
488 query_mode,
489 schema,
490 stmt_type,
491 ..
492 } = plan_fragmenter_result;
493
494 let mut can_timeout_cancel = true;
495 match stmt_type {
497 StatementType::INSERT
498 | StatementType::INSERT_RETURNING
499 | StatementType::DELETE
500 | StatementType::DELETE_RETURNING
501 | StatementType::UPDATE
502 | StatementType::UPDATE_RETURNING => {
503 session.txn_write_guard()?;
504 can_timeout_cancel = false;
505 }
506 _ => {}
507 }
508
509 let query = plan_fragmenter.generate_complete_query().await?;
510 tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
511
512 let pg_descs = schema
513 .fields()
514 .iter()
515 .map(to_pg_field)
516 .collect::<Vec<PgFieldDescriptor>>();
517 let column_types = schema.fields().iter().map(|f| f.data_type()).collect_vec();
518
519 let row_stream = match query_mode {
520 QueryMode::Auto => unreachable!(),
521 QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
522 local_execute(session.clone(), query, can_timeout_cancel).await?,
523 column_types,
524 formats,
525 session.clone(),
526 )),
527 QueryMode::Distributed => {
529 PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
530 distribute_execute(session.clone(), query, can_timeout_cancel).await?,
531 column_types,
532 formats,
533 session.clone(),
534 ))
535 }
536 };
537
538 Ok((row_stream, pg_descs))
539}
540
541async fn execute_risingwave_plan(
542 session: Arc<SessionImpl>,
543 plan_fragmenter_result: BatchPlanFragmenterResult,
544 formats: Vec<Format>,
545) -> Result<RwPgResponse> {
546 let first_field_format = formats.first().copied().unwrap_or(Format::Text);
548 let query_mode = plan_fragmenter_result.query_mode;
549 let stmt_type = plan_fragmenter_result.stmt_type;
550
551 let query_start_time = Instant::now();
552 let (row_stream, pg_descs) =
553 create_stream(session.clone(), plan_fragmenter_result, formats).await?;
554
555 let callback = async move {
558 if session.config().implicit_flush() && stmt_type.is_dml() {
560 do_flush(&session).await?;
561 }
562
563 match query_mode {
565 QueryMode::Auto => unreachable!(),
566 QueryMode::Local => {
567 session
568 .env()
569 .frontend_metrics
570 .latency_local_execution
571 .observe(query_start_time.elapsed().as_secs_f64());
572
573 session
574 .env()
575 .frontend_metrics
576 .query_counter_local_execution
577 .inc();
578 }
579 QueryMode::Distributed => {
580 session
581 .env()
582 .query_manager()
583 .query_metrics
584 .query_latency
585 .observe(query_start_time.elapsed().as_secs_f64());
586
587 session
588 .env()
589 .query_manager()
590 .query_metrics
591 .completed_query_counter
592 .inc();
593 }
594 }
595
596 Ok(())
597 };
598
599 Ok(PgResponse::builder(stmt_type)
600 .row_cnt_format_opt(Some(first_field_format))
601 .values(row_stream, pg_descs)
602 .callback(callback)
603 .into())
604}
605
606pub async fn distribute_execute(
607 session: Arc<SessionImpl>,
608 query: Query,
609 can_timeout_cancel: bool,
610) -> Result<DistributedQueryStream> {
611 let timeout = if cfg!(madsim) {
612 None
613 } else if can_timeout_cancel {
614 Some(session.statement_timeout())
615 } else {
616 None
617 };
618 let execution_context: ExecutionContextRef =
619 ExecutionContext::new(session.clone(), timeout).into();
620 let query_manager = session.env().query_manager().clone();
621
622 query_manager
623 .schedule(execution_context, query)
624 .await
625 .map_err(|err| err.into())
626}
627
628pub async fn local_execute(
629 session: Arc<SessionImpl>,
630 mut query: Query,
631 can_timeout_cancel: bool,
632) -> Result<LocalQueryStream> {
633 let timeout = if cfg!(madsim) {
634 None
635 } else if can_timeout_cancel {
636 Some(session.statement_timeout())
637 } else {
638 None
639 };
640 let front_env = session.env();
641
642 let snapshot = session.pinned_snapshot();
643
644 snapshot.fill_batch_query_epoch(&mut query)?;
645
646 let execution = LocalQueryExecution::new(
647 query,
648 front_env.clone(),
649 snapshot.support_barrier_read(),
650 session,
651 timeout,
652 );
653
654 Ok(execution.stream_rows())
655}