risingwave_frontend/handler/
query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Instant;
18
19use itertools::Itertools;
20use pgwire::pg_field_descriptor::PgFieldDescriptor;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::types::Format;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{FunctionId, Schema};
26use risingwave_common::session_config::QueryMode;
27use risingwave_common::types::{DataType, Datum};
28use risingwave_sqlparser::ast::{SetExpr, Statement};
29
30use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
31use super::{PgResponseStream, RwPgResponse, create_mv, declare_cursor};
32use crate::binder::{Binder, BoundCreateView, BoundStatement};
33use crate::catalog::TableId;
34use crate::error::{ErrorCode, Result, RwError};
35use crate::handler::HandlerArgs;
36use crate::handler::flush::do_flush;
37use crate::handler::util::{DataChunkToRowSetAdapter, to_pg_field};
38use crate::optimizer::plan_node::{BatchPlanRef, Explain};
39use crate::optimizer::{
40    BatchPlanRoot, ExecutionModeDecider, OptimizerContext, OptimizerContextRef,
41    RelationCollectorVisitor, SysTableVisitor,
42};
43use crate::planner::Planner;
44use crate::scheduler::plan_fragmenter::Query;
45use crate::scheduler::{
46    BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
47    LocalQueryExecution, LocalQueryStream,
48};
49use crate::session::SessionImpl;
50
51pub async fn handle_query(
52    handler_args: HandlerArgs,
53    stmt: Statement,
54    formats: Vec<Format>,
55) -> Result<RwPgResponse> {
56    let session = handler_args.session.clone();
57    let plan_fragmenter_result = {
58        let context = OptimizerContext::from_handler_args(handler_args);
59        let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
60        // Time zone is used by Hummock time travel query.
61        risingwave_expr::expr_context::TIME_ZONE::sync_scope(session.config().timezone(), || {
62            gen_batch_plan_fragmenter(&session, plan_result)
63        })?
64    };
65    execute(session, plan_fragmenter_result, formats).await
66}
67
68fn handle_parse_inner(binder: Binder, statement: Statement) -> Result<PrepareStatement> {
69    let bound_result = gen_bound(binder, statement.clone())?;
70
71    Ok(PrepareStatement::Prepared(PreparedResult {
72        statement,
73        bound_result,
74    }))
75}
76
77pub fn handle_parse_for_batch(
78    handler_args: HandlerArgs,
79    statement: Statement,
80    specified_param_types: Vec<Option<DataType>>,
81) -> Result<PrepareStatement> {
82    let binder = Binder::new_for_batch(&handler_args.session)
83        .with_specified_params_types(specified_param_types);
84    handle_parse_inner(binder, statement)
85}
86
87pub fn handle_parse_for_stream(
88    handler_args: HandlerArgs,
89    statement: Statement,
90    specified_param_types: Vec<Option<DataType>>,
91) -> Result<PrepareStatement> {
92    let binder = Binder::new_for_stream(&handler_args.session)
93        .with_specified_params_types(specified_param_types);
94    handle_parse_inner(binder, statement)
95}
96
97/// Execute a "Portal", which is a prepared statement with bound parameters.
98pub async fn handle_execute(
99    handler_args: HandlerArgs,
100    portal: PortalResult,
101) -> Result<RwPgResponse> {
102    let PortalResult {
103        bound_result,
104        result_formats,
105        statement,
106    } = portal;
107    match statement {
108        Statement::Query(_)
109        | Statement::Insert { .. }
110        | Statement::Delete { .. }
111        | Statement::Update { .. } => {
112            // Execute a batch query
113            let session = handler_args.session.clone();
114            let plan_fragmenter_result = {
115                let context = OptimizerContext::from_handler_args(handler_args);
116                let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
117                // Time zone is used by Hummock time travel query.
118                risingwave_expr::expr_context::TIME_ZONE::sync_scope(
119                    session.config().timezone(),
120                    || gen_batch_plan_fragmenter(&session, plan_result),
121                )?
122            };
123            execute(session, plan_fragmenter_result, result_formats).await
124        }
125        Statement::CreateView { materialized, .. } if materialized => {
126            // Execute a CREATE MATERIALIZED VIEW
127            let BoundResult {
128                bound,
129                dependent_relations,
130                dependent_udfs,
131                ..
132            } = bound_result;
133            let create_mv = if let BoundStatement::CreateView(create_mv) = bound {
134                create_mv
135            } else {
136                unreachable!("expect a BoundStatement::CreateView")
137            };
138            let BoundCreateView {
139                or_replace,
140                materialized: _,
141                if_not_exists,
142                name,
143                columns,
144                query,
145                emit_mode,
146                with_options,
147            } = *create_mv;
148            if or_replace {
149                bail_not_implemented!("CREATE OR REPLACE VIEW");
150            }
151
152            // Hack: replace the `with_options` with the bounded ones.
153            let handler_args = HandlerArgs {
154                session: handler_args.session.clone(),
155                sql: handler_args.sql.clone(),
156                normalized_sql: handler_args.normalized_sql.clone(),
157                with_options: crate::WithOptions::try_from(with_options.as_slice())?,
158            };
159
160            create_mv::handle_create_mv_bound(
161                handler_args,
162                if_not_exists,
163                name,
164                *query,
165                dependent_relations,
166                dependent_udfs,
167                columns,
168                emit_mode,
169            )
170            .await
171        }
172        Statement::DeclareCursor { stmt } => match stmt.declare_cursor {
173            risingwave_sqlparser::ast::DeclareCursor::Query(_) => {
174                let session = handler_args.session.clone();
175                let plan_fragmenter_result = {
176                    let context = OptimizerContext::from_handler_args(handler_args.clone());
177                    let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
178                    gen_batch_plan_fragmenter(&session, plan_result)?
179                };
180                declare_cursor::handle_bound_declare_query_cursor(
181                    handler_args,
182                    stmt.cursor_name,
183                    plan_fragmenter_result,
184                )
185                .await
186            }
187            risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
188                declare_cursor::handle_declare_subscription_cursor(
189                    handler_args,
190                    sub_name,
191                    stmt.cursor_name,
192                    rw_timestamp,
193                )
194                .await
195            }
196        },
197        _ => unreachable!(),
198    }
199}
200
201pub fn gen_batch_plan_by_statement(
202    session: &SessionImpl,
203    context: OptimizerContextRef,
204    stmt: Statement,
205) -> Result<BatchQueryPlanResult> {
206    let binder = Binder::new_for_batch(session);
207    let bound_result = gen_bound(binder, stmt)?;
208    gen_batch_query_plan(session, context, bound_result)
209}
210
211#[derive(Clone)]
212pub struct BoundResult {
213    pub(crate) stmt_type: StatementType,
214    pub(crate) must_dist: bool,
215    pub(crate) bound: BoundStatement,
216    pub(crate) param_types: Vec<DataType>,
217    pub(crate) parsed_params: Option<Vec<Datum>>,
218    pub(crate) dependent_relations: HashSet<TableId>,
219    /// TODO(rc): merge with `dependent_relations`
220    pub(crate) dependent_udfs: HashSet<FunctionId>,
221}
222
223fn gen_bound(mut binder: Binder, stmt: Statement) -> Result<BoundResult> {
224    let stmt_type = StatementType::infer_from_statement(&stmt)
225        .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
226    let must_dist = must_run_in_distributed_mode(&stmt)?;
227
228    let bound = binder.bind(stmt)?;
229
230    Ok(BoundResult {
231        stmt_type,
232        must_dist,
233        bound,
234        param_types: binder.export_param_types()?,
235        parsed_params: None,
236        dependent_relations: binder.included_relations().clone(),
237        dependent_udfs: binder.included_udfs().clone(),
238    })
239}
240
241pub struct BatchQueryPlanResult {
242    pub(crate) plan: BatchPlanRef,
243    pub(crate) query_mode: QueryMode,
244    pub(crate) schema: Schema,
245    pub(crate) stmt_type: StatementType,
246    // Note that these relations are only resolved in the binding phase, and it may only be a
247    // subset of the final one. i.e. the final one may contain more implicit dependencies on
248    // indices.
249    pub(crate) dependent_relations: Vec<TableId>,
250}
251
252fn gen_batch_query_plan(
253    session: &SessionImpl,
254    context: OptimizerContextRef,
255    bind_result: BoundResult,
256) -> Result<BatchQueryPlanResult> {
257    let BoundResult {
258        stmt_type,
259        must_dist,
260        bound,
261        dependent_relations,
262        ..
263    } = bind_result;
264
265    let mut planner = if matches!(bound, BoundStatement::Query(_)) {
266        Planner::new_for_batch_dql(context)
267    } else {
268        Planner::new_for_batch(context)
269    };
270
271    let logical = planner.plan(bound)?;
272    let schema = logical.schema();
273    let batch_plan = logical.gen_batch_plan()?;
274
275    let dependent_relations =
276        RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.plan.clone());
277
278    let must_local = must_run_in_local_mode(&batch_plan);
279
280    let query_mode = match (must_dist, must_local) {
281        (true, true) => {
282            return Err(ErrorCode::InternalError(
283                "the query is forced to both local and distributed mode by optimizer".to_owned(),
284            )
285            .into());
286        }
287        (true, false) => QueryMode::Distributed,
288        (false, true) => QueryMode::Local,
289        (false, false) => match session.config().query_mode() {
290            QueryMode::Auto => determine_query_mode(&batch_plan),
291            QueryMode::Local => QueryMode::Local,
292            QueryMode::Distributed => QueryMode::Distributed,
293        },
294    };
295
296    let physical = match query_mode {
297        QueryMode::Auto => unreachable!(),
298        QueryMode::Local => batch_plan.gen_batch_local_plan()?,
299        QueryMode::Distributed => batch_plan.gen_batch_distributed_plan()?,
300    };
301
302    Ok(BatchQueryPlanResult {
303        plan: physical,
304        query_mode,
305        schema,
306        stmt_type,
307        dependent_relations: dependent_relations.into_iter().collect_vec(),
308    })
309}
310
311fn must_run_in_distributed_mode(stmt: &Statement) -> Result<bool> {
312    fn is_insert_using_select(stmt: &Statement) -> bool {
313        fn has_select_query(set_expr: &SetExpr) -> bool {
314            match set_expr {
315                SetExpr::Select(_) => true,
316                SetExpr::Query(query) => has_select_query(&query.body),
317                SetExpr::SetOperation { left, right, .. } => {
318                    has_select_query(left) || has_select_query(right)
319                }
320                SetExpr::Values(_) => false,
321            }
322        }
323
324        matches!(
325            stmt,
326            Statement::Insert {source, ..} if has_select_query(&source.body)
327        )
328    }
329
330    let stmt_type = StatementType::infer_from_statement(stmt)
331        .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
332
333    Ok(matches!(
334        stmt_type,
335        StatementType::UPDATE
336            | StatementType::DELETE
337            | StatementType::UPDATE_RETURNING
338            | StatementType::DELETE_RETURNING
339    ) | is_insert_using_select(stmt))
340}
341
342fn must_run_in_local_mode(batch_plan: &BatchPlanRoot) -> bool {
343    SysTableVisitor::has_sys_table(batch_plan)
344}
345
346fn determine_query_mode(batch_plan: &BatchPlanRoot) -> QueryMode {
347    if ExecutionModeDecider::run_in_local_mode(batch_plan) {
348        QueryMode::Local
349    } else {
350        QueryMode::Distributed
351    }
352}
353
354pub struct BatchPlanFragmenterResult {
355    pub(crate) plan_fragmenter: BatchPlanFragmenter,
356    pub(crate) query_mode: QueryMode,
357    pub(crate) schema: Schema,
358    pub(crate) stmt_type: StatementType,
359}
360
361pub fn gen_batch_plan_fragmenter(
362    session: &SessionImpl,
363    plan_result: BatchQueryPlanResult,
364) -> Result<BatchPlanFragmenterResult> {
365    let BatchQueryPlanResult {
366        plan,
367        query_mode,
368        schema,
369        stmt_type,
370        ..
371    } = plan_result;
372
373    tracing::trace!(
374        "Generated query plan: {:?}, query_mode:{:?}",
375        plan.explain_to_string(),
376        query_mode
377    );
378    let worker_node_manager_reader = WorkerNodeSelector::new(
379        session.env().worker_node_manager_ref(),
380        session.is_barrier_read(),
381    );
382    let plan_fragmenter = BatchPlanFragmenter::new(
383        worker_node_manager_reader,
384        session.env().catalog_reader().clone(),
385        session.config().batch_parallelism().0,
386        session.config().timezone(),
387        plan,
388    )?;
389
390    Ok(BatchPlanFragmenterResult {
391        plan_fragmenter,
392        query_mode,
393        schema,
394        stmt_type,
395    })
396}
397
398pub async fn create_stream(
399    session: Arc<SessionImpl>,
400    plan_fragmenter_result: BatchPlanFragmenterResult,
401    formats: Vec<Format>,
402) -> Result<(PgResponseStream, Vec<PgFieldDescriptor>)> {
403    let BatchPlanFragmenterResult {
404        plan_fragmenter,
405        query_mode,
406        schema,
407        stmt_type,
408        ..
409    } = plan_fragmenter_result;
410
411    let mut can_timeout_cancel = true;
412    // Acquire the write guard for DML statements.
413    match stmt_type {
414        StatementType::INSERT
415        | StatementType::INSERT_RETURNING
416        | StatementType::DELETE
417        | StatementType::DELETE_RETURNING
418        | StatementType::UPDATE
419        | StatementType::UPDATE_RETURNING => {
420            session.txn_write_guard()?;
421            can_timeout_cancel = false;
422        }
423        _ => {}
424    }
425
426    let query = plan_fragmenter.generate_complete_query().await?;
427    tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
428
429    let pg_descs = schema
430        .fields()
431        .iter()
432        .map(to_pg_field)
433        .collect::<Vec<PgFieldDescriptor>>();
434    let column_types = schema.fields().iter().map(|f| f.data_type()).collect_vec();
435
436    let row_stream = match query_mode {
437        QueryMode::Auto => unreachable!(),
438        QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
439            local_execute(session.clone(), query, can_timeout_cancel).await?,
440            column_types,
441            formats,
442            session.clone(),
443        )),
444        // Local mode do not support cancel tasks.
445        QueryMode::Distributed => {
446            PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
447                distribute_execute(session.clone(), query, can_timeout_cancel).await?,
448                column_types,
449                formats,
450                session.clone(),
451            ))
452        }
453    };
454
455    Ok((row_stream, pg_descs))
456}
457
458async fn execute(
459    session: Arc<SessionImpl>,
460    plan_fragmenter_result: BatchPlanFragmenterResult,
461    formats: Vec<Format>,
462) -> Result<RwPgResponse> {
463    // Used in counting row count.
464    let first_field_format = formats.first().copied().unwrap_or(Format::Text);
465    let query_mode = plan_fragmenter_result.query_mode;
466    let stmt_type = plan_fragmenter_result.stmt_type;
467
468    let query_start_time = Instant::now();
469    let (row_stream, pg_descs) =
470        create_stream(session.clone(), plan_fragmenter_result, formats).await?;
471
472    // We need to do some post work after the query is finished and before the `Complete` response
473    // it sent. This is achieved by the `callback` in `PgResponse`.
474    let callback = async move {
475        // Implicitly flush the writes.
476        if session.config().implicit_flush() && stmt_type.is_dml() {
477            do_flush(&session).await?;
478        }
479
480        // update some metrics
481        match query_mode {
482            QueryMode::Auto => unreachable!(),
483            QueryMode::Local => {
484                session
485                    .env()
486                    .frontend_metrics
487                    .latency_local_execution
488                    .observe(query_start_time.elapsed().as_secs_f64());
489
490                session
491                    .env()
492                    .frontend_metrics
493                    .query_counter_local_execution
494                    .inc();
495            }
496            QueryMode::Distributed => {
497                session
498                    .env()
499                    .query_manager()
500                    .query_metrics
501                    .query_latency
502                    .observe(query_start_time.elapsed().as_secs_f64());
503
504                session
505                    .env()
506                    .query_manager()
507                    .query_metrics
508                    .completed_query_counter
509                    .inc();
510            }
511        }
512
513        Ok(())
514    };
515
516    Ok(PgResponse::builder(stmt_type)
517        .row_cnt_format_opt(Some(first_field_format))
518        .values(row_stream, pg_descs)
519        .callback(callback)
520        .into())
521}
522
523pub async fn distribute_execute(
524    session: Arc<SessionImpl>,
525    query: Query,
526    can_timeout_cancel: bool,
527) -> Result<DistributedQueryStream> {
528    let timeout = if cfg!(madsim) {
529        None
530    } else if can_timeout_cancel {
531        Some(session.statement_timeout())
532    } else {
533        None
534    };
535    let execution_context: ExecutionContextRef =
536        ExecutionContext::new(session.clone(), timeout).into();
537    let query_manager = session.env().query_manager().clone();
538
539    query_manager
540        .schedule(execution_context, query)
541        .await
542        .map_err(|err| err.into())
543}
544
545pub async fn local_execute(
546    session: Arc<SessionImpl>,
547    mut query: Query,
548    can_timeout_cancel: bool,
549) -> Result<LocalQueryStream> {
550    let timeout = if cfg!(madsim) {
551        None
552    } else if can_timeout_cancel {
553        Some(session.statement_timeout())
554    } else {
555        None
556    };
557    let front_env = session.env();
558
559    let snapshot = session.pinned_snapshot();
560
561    snapshot.fill_batch_query_epoch(&mut query)?;
562
563    let execution = LocalQueryExecution::new(
564        query,
565        front_env.clone(),
566        snapshot.support_barrier_read(),
567        session,
568        timeout,
569    );
570
571    Ok(execution.stream_rows())
572}