risingwave_frontend/handler/
query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Instant;
18
19use itertools::Itertools;
20use pgwire::pg_field_descriptor::PgFieldDescriptor;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::types::Format;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{FunctionId, Schema};
26use risingwave_common::session_config::QueryMode;
27use risingwave_common::types::{DataType, Datum};
28use risingwave_sqlparser::ast::{SetExpr, Statement};
29
30use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
31use super::{PgResponseStream, RwPgResponse, create_mv, declare_cursor};
32use crate::binder::{Binder, BoundCreateView, BoundStatement};
33use crate::catalog::TableId;
34use crate::error::{ErrorCode, Result, RwError};
35use crate::handler::HandlerArgs;
36use crate::handler::flush::do_flush;
37use crate::handler::util::{DataChunkToRowSetAdapter, to_pg_field};
38use crate::optimizer::plan_node::{BatchPlanRef, Explain};
39use crate::optimizer::{
40    BatchPlanRoot, ExecutionModeDecider, OptimizerContext, OptimizerContextRef,
41    ReadStorageTableVisitor, RelationCollectorVisitor, SysTableVisitor,
42};
43use crate::planner::Planner;
44use crate::scheduler::plan_fragmenter::Query;
45use crate::scheduler::{
46    BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
47    LocalQueryExecution, LocalQueryStream,
48};
49use crate::session::SessionImpl;
50
51pub async fn handle_query(
52    handler_args: HandlerArgs,
53    stmt: Statement,
54    formats: Vec<Format>,
55) -> Result<RwPgResponse> {
56    let session = handler_args.session.clone();
57    let plan_fragmenter_result = {
58        let context = OptimizerContext::from_handler_args(handler_args);
59        let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
60        // Time zone is used by Hummock time travel query.
61        risingwave_expr::expr_context::TIME_ZONE::sync_scope(
62            session.config().timezone().to_owned(),
63            || gen_batch_plan_fragmenter(&session, plan_result),
64        )?
65    };
66    execute(session, plan_fragmenter_result, formats).await
67}
68
69pub fn handle_parse(
70    handler_args: HandlerArgs,
71    statement: Statement,
72    specific_param_types: Vec<Option<DataType>>,
73) -> Result<PrepareStatement> {
74    let session = handler_args.session;
75    let bound_result = gen_bound(&session, statement.clone(), specific_param_types)?;
76
77    Ok(PrepareStatement::Prepared(PreparedResult {
78        statement,
79        bound_result,
80    }))
81}
82
83/// Execute a "Portal", which is a prepared statement with bound parameters.
84pub async fn handle_execute(
85    handler_args: HandlerArgs,
86    portal: PortalResult,
87) -> Result<RwPgResponse> {
88    let PortalResult {
89        bound_result,
90        result_formats,
91        statement,
92    } = portal;
93    match statement {
94        Statement::Query(_)
95        | Statement::Insert { .. }
96        | Statement::Delete { .. }
97        | Statement::Update { .. } => {
98            // Execute a batch query
99            let session = handler_args.session.clone();
100            let plan_fragmenter_result = {
101                let context = OptimizerContext::from_handler_args(handler_args);
102                let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
103                // Time zone is used by Hummock time travel query.
104                risingwave_expr::expr_context::TIME_ZONE::sync_scope(
105                    session.config().timezone().to_owned(),
106                    || gen_batch_plan_fragmenter(&session, plan_result),
107                )?
108            };
109            execute(session, plan_fragmenter_result, result_formats).await
110        }
111        Statement::CreateView { materialized, .. } if materialized => {
112            // Execute a CREATE MATERIALIZED VIEW
113            let BoundResult {
114                bound,
115                dependent_relations,
116                dependent_udfs,
117                ..
118            } = bound_result;
119            let create_mv = if let BoundStatement::CreateView(create_mv) = bound {
120                create_mv
121            } else {
122                unreachable!("expect a BoundStatement::CreateView")
123            };
124            let BoundCreateView {
125                or_replace,
126                materialized: _,
127                if_not_exists,
128                name,
129                columns,
130                query,
131                emit_mode,
132                with_options,
133            } = *create_mv;
134            if or_replace {
135                bail_not_implemented!("CREATE OR REPLACE VIEW");
136            }
137
138            // Hack: replace the `with_options` with the bounded ones.
139            let handler_args = HandlerArgs {
140                session: handler_args.session.clone(),
141                sql: handler_args.sql.clone(),
142                normalized_sql: handler_args.normalized_sql.clone(),
143                with_options: crate::WithOptions::try_from(with_options.as_slice())?,
144            };
145
146            create_mv::handle_create_mv_bound(
147                handler_args,
148                if_not_exists,
149                name,
150                *query,
151                dependent_relations,
152                dependent_udfs,
153                columns,
154                emit_mode,
155            )
156            .await
157        }
158        Statement::DeclareCursor { stmt } => match stmt.declare_cursor {
159            risingwave_sqlparser::ast::DeclareCursor::Query(_) => {
160                let session = handler_args.session.clone();
161                let plan_fragmenter_result = {
162                    let context = OptimizerContext::from_handler_args(handler_args.clone());
163                    let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
164                    gen_batch_plan_fragmenter(&session, plan_result)?
165                };
166                declare_cursor::handle_bound_declare_query_cursor(
167                    handler_args,
168                    stmt.cursor_name,
169                    plan_fragmenter_result,
170                )
171                .await
172            }
173            risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
174                declare_cursor::handle_declare_subscription_cursor(
175                    handler_args,
176                    sub_name,
177                    stmt.cursor_name,
178                    rw_timestamp,
179                )
180                .await
181            }
182        },
183        _ => unreachable!(),
184    }
185}
186
187pub fn gen_batch_plan_by_statement(
188    session: &SessionImpl,
189    context: OptimizerContextRef,
190    stmt: Statement,
191) -> Result<BatchQueryPlanResult> {
192    let bound_result = gen_bound(session, stmt, vec![])?;
193    gen_batch_query_plan(session, context, bound_result)
194}
195
196#[derive(Clone)]
197pub struct BoundResult {
198    pub(crate) stmt_type: StatementType,
199    pub(crate) must_dist: bool,
200    pub(crate) bound: BoundStatement,
201    pub(crate) param_types: Vec<DataType>,
202    pub(crate) parsed_params: Option<Vec<Datum>>,
203    pub(crate) dependent_relations: HashSet<TableId>,
204    /// TODO(rc): merge with `dependent_relations`
205    pub(crate) dependent_udfs: HashSet<FunctionId>,
206}
207
208fn gen_bound(
209    session: &SessionImpl,
210    stmt: Statement,
211    specific_param_types: Vec<Option<DataType>>,
212) -> Result<BoundResult> {
213    let stmt_type = StatementType::infer_from_statement(&stmt)
214        .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
215    let must_dist = must_run_in_distributed_mode(&stmt)?;
216
217    let mut binder = Binder::new_with_param_types(session, specific_param_types);
218    let bound = binder.bind(stmt)?;
219
220    Ok(BoundResult {
221        stmt_type,
222        must_dist,
223        bound,
224        param_types: binder.export_param_types()?,
225        parsed_params: None,
226        dependent_relations: binder.included_relations().clone(),
227        dependent_udfs: binder.included_udfs().clone(),
228    })
229}
230
231pub struct BatchQueryPlanResult {
232    pub(crate) plan: BatchPlanRef,
233    pub(crate) query_mode: QueryMode,
234    pub(crate) schema: Schema,
235    pub(crate) stmt_type: StatementType,
236    // Note that these relations are only resolved in the binding phase, and it may only be a
237    // subset of the final one. i.e. the final one may contain more implicit dependencies on
238    // indices.
239    pub(crate) dependent_relations: Vec<TableId>,
240    pub(crate) read_storage_tables: HashSet<TableId>,
241}
242
243fn gen_batch_query_plan(
244    session: &SessionImpl,
245    context: OptimizerContextRef,
246    bind_result: BoundResult,
247) -> Result<BatchQueryPlanResult> {
248    let BoundResult {
249        stmt_type,
250        must_dist,
251        bound,
252        dependent_relations,
253        ..
254    } = bind_result;
255
256    let mut planner = if matches!(bound, BoundStatement::Query(_)) {
257        Planner::new_for_batch_dql(context)
258    } else {
259        Planner::new_for_batch(context)
260    };
261
262    let logical = planner.plan(bound)?;
263    let schema = logical.schema();
264    let batch_plan = logical.gen_batch_plan()?;
265
266    let dependent_relations =
267        RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.plan.clone());
268
269    let read_storage_tables = ReadStorageTableVisitor::collect(&batch_plan);
270
271    let must_local = must_run_in_local_mode(&batch_plan);
272
273    let query_mode = match (must_dist, must_local) {
274        (true, true) => {
275            return Err(ErrorCode::InternalError(
276                "the query is forced to both local and distributed mode by optimizer".to_owned(),
277            )
278            .into());
279        }
280        (true, false) => QueryMode::Distributed,
281        (false, true) => QueryMode::Local,
282        (false, false) => match session.config().query_mode() {
283            QueryMode::Auto => determine_query_mode(&batch_plan),
284            QueryMode::Local => QueryMode::Local,
285            QueryMode::Distributed => QueryMode::Distributed,
286        },
287    };
288
289    let physical = match query_mode {
290        QueryMode::Auto => unreachable!(),
291        QueryMode::Local => batch_plan.gen_batch_local_plan()?,
292        QueryMode::Distributed => batch_plan.gen_batch_distributed_plan()?,
293    };
294
295    Ok(BatchQueryPlanResult {
296        plan: physical,
297        query_mode,
298        schema,
299        stmt_type,
300        dependent_relations: dependent_relations.into_iter().collect_vec(),
301        read_storage_tables,
302    })
303}
304
305fn must_run_in_distributed_mode(stmt: &Statement) -> Result<bool> {
306    fn is_insert_using_select(stmt: &Statement) -> bool {
307        fn has_select_query(set_expr: &SetExpr) -> bool {
308            match set_expr {
309                SetExpr::Select(_) => true,
310                SetExpr::Query(query) => has_select_query(&query.body),
311                SetExpr::SetOperation { left, right, .. } => {
312                    has_select_query(left) || has_select_query(right)
313                }
314                SetExpr::Values(_) => false,
315            }
316        }
317
318        matches!(
319            stmt,
320            Statement::Insert {source, ..} if has_select_query(&source.body)
321        )
322    }
323
324    let stmt_type = StatementType::infer_from_statement(stmt)
325        .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
326
327    Ok(matches!(
328        stmt_type,
329        StatementType::UPDATE
330            | StatementType::DELETE
331            | StatementType::UPDATE_RETURNING
332            | StatementType::DELETE_RETURNING
333    ) | is_insert_using_select(stmt))
334}
335
336fn must_run_in_local_mode(batch_plan: &BatchPlanRoot) -> bool {
337    SysTableVisitor::has_sys_table(batch_plan)
338}
339
340fn determine_query_mode(batch_plan: &BatchPlanRoot) -> QueryMode {
341    if ExecutionModeDecider::run_in_local_mode(batch_plan) {
342        QueryMode::Local
343    } else {
344        QueryMode::Distributed
345    }
346}
347
348pub struct BatchPlanFragmenterResult {
349    pub(crate) plan_fragmenter: BatchPlanFragmenter,
350    pub(crate) query_mode: QueryMode,
351    pub(crate) schema: Schema,
352    pub(crate) stmt_type: StatementType,
353    pub(crate) read_storage_tables: HashSet<TableId>,
354}
355
356pub fn gen_batch_plan_fragmenter(
357    session: &SessionImpl,
358    plan_result: BatchQueryPlanResult,
359) -> Result<BatchPlanFragmenterResult> {
360    let BatchQueryPlanResult {
361        plan,
362        query_mode,
363        schema,
364        stmt_type,
365        read_storage_tables,
366        ..
367    } = plan_result;
368
369    tracing::trace!(
370        "Generated query plan: {:?}, query_mode:{:?}",
371        plan.explain_to_string(),
372        query_mode
373    );
374    let worker_node_manager_reader = WorkerNodeSelector::new(
375        session.env().worker_node_manager_ref(),
376        session.is_barrier_read(),
377    );
378    let plan_fragmenter = BatchPlanFragmenter::new(
379        worker_node_manager_reader,
380        session.env().catalog_reader().clone(),
381        session.config().batch_parallelism().0,
382        session.config().timezone().to_owned(),
383        plan,
384    )?;
385
386    Ok(BatchPlanFragmenterResult {
387        plan_fragmenter,
388        query_mode,
389        schema,
390        stmt_type,
391        read_storage_tables,
392    })
393}
394
395pub async fn create_stream(
396    session: Arc<SessionImpl>,
397    plan_fragmenter_result: BatchPlanFragmenterResult,
398    formats: Vec<Format>,
399) -> Result<(PgResponseStream, Vec<PgFieldDescriptor>)> {
400    let BatchPlanFragmenterResult {
401        plan_fragmenter,
402        query_mode,
403        schema,
404        stmt_type,
405        read_storage_tables,
406    } = plan_fragmenter_result;
407
408    let mut can_timeout_cancel = true;
409    // Acquire the write guard for DML statements.
410    match stmt_type {
411        StatementType::INSERT
412        | StatementType::INSERT_RETURNING
413        | StatementType::DELETE
414        | StatementType::DELETE_RETURNING
415        | StatementType::UPDATE
416        | StatementType::UPDATE_RETURNING => {
417            session.txn_write_guard()?;
418            can_timeout_cancel = false;
419        }
420        _ => {}
421    }
422
423    let query = plan_fragmenter.generate_complete_query().await?;
424    tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
425
426    let pg_descs = schema
427        .fields()
428        .iter()
429        .map(to_pg_field)
430        .collect::<Vec<PgFieldDescriptor>>();
431    let column_types = schema.fields().iter().map(|f| f.data_type()).collect_vec();
432
433    let row_stream = match query_mode {
434        QueryMode::Auto => unreachable!(),
435        QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
436            local_execute(
437                session.clone(),
438                query,
439                can_timeout_cancel,
440                &read_storage_tables,
441            )
442            .await?,
443            column_types,
444            formats,
445            session.clone(),
446        )),
447        // Local mode do not support cancel tasks.
448        QueryMode::Distributed => {
449            PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
450                distribute_execute(
451                    session.clone(),
452                    query,
453                    can_timeout_cancel,
454                    read_storage_tables,
455                )
456                .await?,
457                column_types,
458                formats,
459                session.clone(),
460            ))
461        }
462    };
463
464    Ok((row_stream, pg_descs))
465}
466
467async fn execute(
468    session: Arc<SessionImpl>,
469    plan_fragmenter_result: BatchPlanFragmenterResult,
470    formats: Vec<Format>,
471) -> Result<RwPgResponse> {
472    // Used in counting row count.
473    let first_field_format = formats.first().copied().unwrap_or(Format::Text);
474    let query_mode = plan_fragmenter_result.query_mode;
475    let stmt_type = plan_fragmenter_result.stmt_type;
476
477    let query_start_time = Instant::now();
478    let (row_stream, pg_descs) =
479        create_stream(session.clone(), plan_fragmenter_result, formats).await?;
480
481    // We need to do some post work after the query is finished and before the `Complete` response
482    // it sent. This is achieved by the `callback` in `PgResponse`.
483    let callback = async move {
484        // Implicitly flush the writes.
485        if session.config().implicit_flush() && stmt_type.is_dml() {
486            do_flush(&session).await?;
487        }
488
489        // update some metrics
490        match query_mode {
491            QueryMode::Auto => unreachable!(),
492            QueryMode::Local => {
493                session
494                    .env()
495                    .frontend_metrics
496                    .latency_local_execution
497                    .observe(query_start_time.elapsed().as_secs_f64());
498
499                session
500                    .env()
501                    .frontend_metrics
502                    .query_counter_local_execution
503                    .inc();
504            }
505            QueryMode::Distributed => {
506                session
507                    .env()
508                    .query_manager()
509                    .query_metrics
510                    .query_latency
511                    .observe(query_start_time.elapsed().as_secs_f64());
512
513                session
514                    .env()
515                    .query_manager()
516                    .query_metrics
517                    .completed_query_counter
518                    .inc();
519            }
520        }
521
522        Ok(())
523    };
524
525    Ok(PgResponse::builder(stmt_type)
526        .row_cnt_format_opt(Some(first_field_format))
527        .values(row_stream, pg_descs)
528        .callback(callback)
529        .into())
530}
531
532pub async fn distribute_execute(
533    session: Arc<SessionImpl>,
534    query: Query,
535    can_timeout_cancel: bool,
536    read_storage_tables: HashSet<TableId>,
537) -> Result<DistributedQueryStream> {
538    let timeout = if cfg!(madsim) {
539        None
540    } else if can_timeout_cancel {
541        Some(session.statement_timeout())
542    } else {
543        None
544    };
545    let execution_context: ExecutionContextRef =
546        ExecutionContext::new(session.clone(), timeout).into();
547    let query_manager = session.env().query_manager().clone();
548
549    query_manager
550        .schedule(execution_context, query, read_storage_tables)
551        .await
552        .map_err(|err| err.into())
553}
554
555pub async fn local_execute(
556    session: Arc<SessionImpl>,
557    query: Query,
558    can_timeout_cancel: bool,
559    read_storage_tables: &HashSet<TableId>,
560) -> Result<LocalQueryStream> {
561    let timeout = if cfg!(madsim) {
562        None
563    } else if can_timeout_cancel {
564        Some(session.statement_timeout())
565    } else {
566        None
567    };
568    let front_env = session.env();
569
570    let snapshot = session.pinned_snapshot();
571
572    let execution = LocalQueryExecution::new(
573        query,
574        front_env.clone(),
575        snapshot.support_barrier_read(),
576        snapshot.batch_query_epoch(read_storage_tables)?,
577        session,
578        timeout,
579    );
580
581    Ok(execution.stream_rows())
582}