1use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Instant;
18
19use itertools::Itertools;
20use pgwire::pg_field_descriptor::PgFieldDescriptor;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::types::Format;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{FunctionId, Schema};
26use risingwave_common::session_config::QueryMode;
27use risingwave_common::types::{DataType, Datum};
28use risingwave_sqlparser::ast::{SetExpr, Statement};
29
30use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
31use super::{PgResponseStream, RwPgResponse, create_mv, declare_cursor};
32use crate::binder::{Binder, BoundCreateView, BoundStatement};
33use crate::catalog::TableId;
34use crate::error::{ErrorCode, Result, RwError};
35use crate::handler::HandlerArgs;
36use crate::handler::flush::do_flush;
37use crate::handler::util::{DataChunkToRowSetAdapter, to_pg_field};
38use crate::optimizer::plan_node::{BatchPlanRef, Explain};
39use crate::optimizer::{
40 BatchPlanRoot, ExecutionModeDecider, OptimizerContext, OptimizerContextRef,
41 RelationCollectorVisitor, SysTableVisitor,
42};
43use crate::planner::Planner;
44use crate::scheduler::plan_fragmenter::Query;
45use crate::scheduler::{
46 BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
47 LocalQueryExecution, LocalQueryStream,
48};
49use crate::session::SessionImpl;
50
51pub async fn handle_query(
52 handler_args: HandlerArgs,
53 stmt: Statement,
54 formats: Vec<Format>,
55) -> Result<RwPgResponse> {
56 let session = handler_args.session.clone();
57 let plan_fragmenter_result = {
58 let context = OptimizerContext::from_handler_args(handler_args);
59 let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
60 risingwave_expr::expr_context::TIME_ZONE::sync_scope(session.config().timezone(), || {
62 gen_batch_plan_fragmenter(&session, plan_result)
63 })?
64 };
65 execute(session, plan_fragmenter_result, formats).await
66}
67
68fn handle_parse_inner(binder: Binder, statement: Statement) -> Result<PrepareStatement> {
69 let bound_result = gen_bound(binder, statement.clone())?;
70
71 Ok(PrepareStatement::Prepared(PreparedResult {
72 statement,
73 bound_result,
74 }))
75}
76
77pub fn handle_parse_for_batch(
78 handler_args: HandlerArgs,
79 statement: Statement,
80 specified_param_types: Vec<Option<DataType>>,
81) -> Result<PrepareStatement> {
82 let binder = Binder::new_for_batch(&handler_args.session)
83 .with_specified_params_types(specified_param_types);
84 handle_parse_inner(binder, statement)
85}
86
87pub fn handle_parse_for_stream(
88 handler_args: HandlerArgs,
89 statement: Statement,
90 specified_param_types: Vec<Option<DataType>>,
91) -> Result<PrepareStatement> {
92 let binder = Binder::new_for_stream(&handler_args.session)
93 .with_specified_params_types(specified_param_types);
94 handle_parse_inner(binder, statement)
95}
96
97pub async fn handle_execute(
99 handler_args: HandlerArgs,
100 portal: PortalResult,
101) -> Result<RwPgResponse> {
102 let PortalResult {
103 bound_result,
104 result_formats,
105 statement,
106 } = portal;
107 match statement {
108 Statement::Query(_)
109 | Statement::Insert { .. }
110 | Statement::Delete { .. }
111 | Statement::Update { .. } => {
112 let session = handler_args.session.clone();
114 let plan_fragmenter_result = {
115 let context = OptimizerContext::from_handler_args(handler_args);
116 let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
117 risingwave_expr::expr_context::TIME_ZONE::sync_scope(
119 session.config().timezone(),
120 || gen_batch_plan_fragmenter(&session, plan_result),
121 )?
122 };
123 execute(session, plan_fragmenter_result, result_formats).await
124 }
125 Statement::CreateView { materialized, .. } if materialized => {
126 let BoundResult {
128 bound,
129 dependent_relations,
130 dependent_udfs,
131 ..
132 } = bound_result;
133 let create_mv = if let BoundStatement::CreateView(create_mv) = bound {
134 create_mv
135 } else {
136 unreachable!("expect a BoundStatement::CreateView")
137 };
138 let BoundCreateView {
139 or_replace,
140 materialized: _,
141 if_not_exists,
142 name,
143 columns,
144 query,
145 emit_mode,
146 with_options,
147 } = *create_mv;
148 if or_replace {
149 bail_not_implemented!("CREATE OR REPLACE VIEW");
150 }
151
152 let handler_args = HandlerArgs {
154 session: handler_args.session.clone(),
155 sql: handler_args.sql.clone(),
156 normalized_sql: handler_args.normalized_sql.clone(),
157 with_options: crate::WithOptions::try_from(with_options.as_slice())?,
158 };
159
160 create_mv::handle_create_mv_bound(
161 handler_args,
162 if_not_exists,
163 name,
164 *query,
165 dependent_relations,
166 dependent_udfs,
167 columns,
168 emit_mode,
169 )
170 .await
171 }
172 Statement::DeclareCursor { stmt } => match stmt.declare_cursor {
173 risingwave_sqlparser::ast::DeclareCursor::Query(_) => {
174 let session = handler_args.session.clone();
175 let plan_fragmenter_result = {
176 let context = OptimizerContext::from_handler_args(handler_args.clone());
177 let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
178 gen_batch_plan_fragmenter(&session, plan_result)?
179 };
180 declare_cursor::handle_bound_declare_query_cursor(
181 handler_args,
182 stmt.cursor_name,
183 plan_fragmenter_result,
184 )
185 .await
186 }
187 risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
188 declare_cursor::handle_declare_subscription_cursor(
189 handler_args,
190 sub_name,
191 stmt.cursor_name,
192 rw_timestamp,
193 )
194 .await
195 }
196 },
197 _ => unreachable!(),
198 }
199}
200
201pub fn gen_batch_plan_by_statement(
202 session: &SessionImpl,
203 context: OptimizerContextRef,
204 stmt: Statement,
205) -> Result<BatchQueryPlanResult> {
206 let binder = Binder::new_for_batch(session);
207 let bound_result = gen_bound(binder, stmt)?;
208 gen_batch_query_plan(session, context, bound_result)
209}
210
211#[derive(Clone)]
212pub struct BoundResult {
213 pub(crate) stmt_type: StatementType,
214 pub(crate) must_dist: bool,
215 pub(crate) bound: BoundStatement,
216 pub(crate) param_types: Vec<DataType>,
217 pub(crate) parsed_params: Option<Vec<Datum>>,
218 pub(crate) dependent_relations: HashSet<TableId>,
219 pub(crate) dependent_udfs: HashSet<FunctionId>,
221}
222
223fn gen_bound(mut binder: Binder, stmt: Statement) -> Result<BoundResult> {
224 let stmt_type = StatementType::infer_from_statement(&stmt)
225 .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
226 let must_dist = must_run_in_distributed_mode(&stmt)?;
227
228 let bound = binder.bind(stmt)?;
229
230 Ok(BoundResult {
231 stmt_type,
232 must_dist,
233 bound,
234 param_types: binder.export_param_types()?,
235 parsed_params: None,
236 dependent_relations: binder.included_relations().clone(),
237 dependent_udfs: binder.included_udfs().clone(),
238 })
239}
240
241pub struct BatchQueryPlanResult {
242 pub(crate) plan: BatchPlanRef,
243 pub(crate) query_mode: QueryMode,
244 pub(crate) schema: Schema,
245 pub(crate) stmt_type: StatementType,
246 pub(crate) dependent_relations: Vec<TableId>,
250}
251
252fn gen_batch_query_plan(
253 session: &SessionImpl,
254 context: OptimizerContextRef,
255 bind_result: BoundResult,
256) -> Result<BatchQueryPlanResult> {
257 let BoundResult {
258 stmt_type,
259 must_dist,
260 bound,
261 dependent_relations,
262 ..
263 } = bind_result;
264
265 let mut planner = if matches!(bound, BoundStatement::Query(_)) {
266 Planner::new_for_batch_dql(context)
267 } else {
268 Planner::new_for_batch(context)
269 };
270
271 let logical = planner.plan(bound)?;
272 let schema = logical.schema();
273 let batch_plan = logical.gen_batch_plan()?;
274
275 let dependent_relations =
276 RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.plan.clone());
277
278 let must_local = must_run_in_local_mode(&batch_plan);
279
280 let query_mode = match (must_dist, must_local) {
281 (true, true) => {
282 return Err(ErrorCode::InternalError(
283 "the query is forced to both local and distributed mode by optimizer".to_owned(),
284 )
285 .into());
286 }
287 (true, false) => QueryMode::Distributed,
288 (false, true) => QueryMode::Local,
289 (false, false) => match session.config().query_mode() {
290 QueryMode::Auto => determine_query_mode(&batch_plan),
291 QueryMode::Local => QueryMode::Local,
292 QueryMode::Distributed => QueryMode::Distributed,
293 },
294 };
295
296 let physical = match query_mode {
297 QueryMode::Auto => unreachable!(),
298 QueryMode::Local => batch_plan.gen_batch_local_plan()?,
299 QueryMode::Distributed => batch_plan.gen_batch_distributed_plan()?,
300 };
301
302 Ok(BatchQueryPlanResult {
303 plan: physical,
304 query_mode,
305 schema,
306 stmt_type,
307 dependent_relations: dependent_relations.into_iter().collect_vec(),
308 })
309}
310
311fn must_run_in_distributed_mode(stmt: &Statement) -> Result<bool> {
312 fn is_insert_using_select(stmt: &Statement) -> bool {
313 fn has_select_query(set_expr: &SetExpr) -> bool {
314 match set_expr {
315 SetExpr::Select(_) => true,
316 SetExpr::Query(query) => has_select_query(&query.body),
317 SetExpr::SetOperation { left, right, .. } => {
318 has_select_query(left) || has_select_query(right)
319 }
320 SetExpr::Values(_) => false,
321 }
322 }
323
324 matches!(
325 stmt,
326 Statement::Insert {source, ..} if has_select_query(&source.body)
327 )
328 }
329
330 let stmt_type = StatementType::infer_from_statement(stmt)
331 .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
332
333 Ok(matches!(
334 stmt_type,
335 StatementType::UPDATE
336 | StatementType::DELETE
337 | StatementType::UPDATE_RETURNING
338 | StatementType::DELETE_RETURNING
339 ) | is_insert_using_select(stmt))
340}
341
342fn must_run_in_local_mode(batch_plan: &BatchPlanRoot) -> bool {
343 SysTableVisitor::has_sys_table(batch_plan)
344}
345
346fn determine_query_mode(batch_plan: &BatchPlanRoot) -> QueryMode {
347 if ExecutionModeDecider::run_in_local_mode(batch_plan) {
348 QueryMode::Local
349 } else {
350 QueryMode::Distributed
351 }
352}
353
354pub struct BatchPlanFragmenterResult {
355 pub(crate) plan_fragmenter: BatchPlanFragmenter,
356 pub(crate) query_mode: QueryMode,
357 pub(crate) schema: Schema,
358 pub(crate) stmt_type: StatementType,
359}
360
361pub fn gen_batch_plan_fragmenter(
362 session: &SessionImpl,
363 plan_result: BatchQueryPlanResult,
364) -> Result<BatchPlanFragmenterResult> {
365 let BatchQueryPlanResult {
366 plan,
367 query_mode,
368 schema,
369 stmt_type,
370 ..
371 } = plan_result;
372
373 tracing::trace!(
374 "Generated query plan: {:?}, query_mode:{:?}",
375 plan.explain_to_string(),
376 query_mode
377 );
378 let worker_node_manager_reader = WorkerNodeSelector::new(
379 session.env().worker_node_manager_ref(),
380 session.is_barrier_read(),
381 );
382 let plan_fragmenter = BatchPlanFragmenter::new(
383 worker_node_manager_reader,
384 session.env().catalog_reader().clone(),
385 session.config().batch_parallelism().0,
386 session.config().timezone(),
387 plan,
388 )?;
389
390 Ok(BatchPlanFragmenterResult {
391 plan_fragmenter,
392 query_mode,
393 schema,
394 stmt_type,
395 })
396}
397
398pub async fn create_stream(
399 session: Arc<SessionImpl>,
400 plan_fragmenter_result: BatchPlanFragmenterResult,
401 formats: Vec<Format>,
402) -> Result<(PgResponseStream, Vec<PgFieldDescriptor>)> {
403 let BatchPlanFragmenterResult {
404 plan_fragmenter,
405 query_mode,
406 schema,
407 stmt_type,
408 ..
409 } = plan_fragmenter_result;
410
411 let mut can_timeout_cancel = true;
412 match stmt_type {
414 StatementType::INSERT
415 | StatementType::INSERT_RETURNING
416 | StatementType::DELETE
417 | StatementType::DELETE_RETURNING
418 | StatementType::UPDATE
419 | StatementType::UPDATE_RETURNING => {
420 session.txn_write_guard()?;
421 can_timeout_cancel = false;
422 }
423 _ => {}
424 }
425
426 let query = plan_fragmenter.generate_complete_query().await?;
427 tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
428
429 let pg_descs = schema
430 .fields()
431 .iter()
432 .map(to_pg_field)
433 .collect::<Vec<PgFieldDescriptor>>();
434 let column_types = schema.fields().iter().map(|f| f.data_type()).collect_vec();
435
436 let row_stream = match query_mode {
437 QueryMode::Auto => unreachable!(),
438 QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
439 local_execute(session.clone(), query, can_timeout_cancel).await?,
440 column_types,
441 formats,
442 session.clone(),
443 )),
444 QueryMode::Distributed => {
446 PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
447 distribute_execute(session.clone(), query, can_timeout_cancel).await?,
448 column_types,
449 formats,
450 session.clone(),
451 ))
452 }
453 };
454
455 Ok((row_stream, pg_descs))
456}
457
458async fn execute(
459 session: Arc<SessionImpl>,
460 plan_fragmenter_result: BatchPlanFragmenterResult,
461 formats: Vec<Format>,
462) -> Result<RwPgResponse> {
463 let first_field_format = formats.first().copied().unwrap_or(Format::Text);
465 let query_mode = plan_fragmenter_result.query_mode;
466 let stmt_type = plan_fragmenter_result.stmt_type;
467
468 let query_start_time = Instant::now();
469 let (row_stream, pg_descs) =
470 create_stream(session.clone(), plan_fragmenter_result, formats).await?;
471
472 let callback = async move {
475 if session.config().implicit_flush() && stmt_type.is_dml() {
477 do_flush(&session).await?;
478 }
479
480 match query_mode {
482 QueryMode::Auto => unreachable!(),
483 QueryMode::Local => {
484 session
485 .env()
486 .frontend_metrics
487 .latency_local_execution
488 .observe(query_start_time.elapsed().as_secs_f64());
489
490 session
491 .env()
492 .frontend_metrics
493 .query_counter_local_execution
494 .inc();
495 }
496 QueryMode::Distributed => {
497 session
498 .env()
499 .query_manager()
500 .query_metrics
501 .query_latency
502 .observe(query_start_time.elapsed().as_secs_f64());
503
504 session
505 .env()
506 .query_manager()
507 .query_metrics
508 .completed_query_counter
509 .inc();
510 }
511 }
512
513 Ok(())
514 };
515
516 Ok(PgResponse::builder(stmt_type)
517 .row_cnt_format_opt(Some(first_field_format))
518 .values(row_stream, pg_descs)
519 .callback(callback)
520 .into())
521}
522
523pub async fn distribute_execute(
524 session: Arc<SessionImpl>,
525 query: Query,
526 can_timeout_cancel: bool,
527) -> Result<DistributedQueryStream> {
528 let timeout = if cfg!(madsim) {
529 None
530 } else if can_timeout_cancel {
531 Some(session.statement_timeout())
532 } else {
533 None
534 };
535 let execution_context: ExecutionContextRef =
536 ExecutionContext::new(session.clone(), timeout).into();
537 let query_manager = session.env().query_manager().clone();
538
539 query_manager
540 .schedule(execution_context, query)
541 .await
542 .map_err(|err| err.into())
543}
544
545pub async fn local_execute(
546 session: Arc<SessionImpl>,
547 mut query: Query,
548 can_timeout_cancel: bool,
549) -> Result<LocalQueryStream> {
550 let timeout = if cfg!(madsim) {
551 None
552 } else if can_timeout_cancel {
553 Some(session.statement_timeout())
554 } else {
555 None
556 };
557 let front_env = session.env();
558
559 let snapshot = session.pinned_snapshot();
560
561 snapshot.fill_batch_query_epoch(&mut query)?;
562
563 let execution = LocalQueryExecution::new(
564 query,
565 front_env.clone(),
566 snapshot.support_barrier_read(),
567 session,
568 timeout,
569 );
570
571 Ok(execution.stream_rows())
572}