risingwave_frontend/handler/
query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Instant;
18
19use itertools::Itertools;
20use pgwire::pg_field_descriptor::PgFieldDescriptor;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::types::Format;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{FunctionId, Schema};
26use risingwave_common::session_config::QueryMode;
27use risingwave_common::types::{DataType, Datum};
28use risingwave_sqlparser::ast::{SetExpr, Statement};
29
30use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
31use super::{PgResponseStream, RwPgResponse, create_mv, declare_cursor};
32use crate::PlanRef;
33use crate::binder::{Binder, BoundCreateView, BoundStatement};
34use crate::catalog::TableId;
35use crate::error::{ErrorCode, Result, RwError};
36use crate::handler::HandlerArgs;
37use crate::handler::flush::do_flush;
38use crate::handler::util::{DataChunkToRowSetAdapter, to_pg_field};
39use crate::optimizer::plan_node::Explain;
40use crate::optimizer::{
41    ExecutionModeDecider, OptimizerContext, OptimizerContextRef, ReadStorageTableVisitor,
42    RelationCollectorVisitor, SysTableVisitor,
43};
44use crate::planner::Planner;
45use crate::scheduler::plan_fragmenter::Query;
46use crate::scheduler::{
47    BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
48    LocalQueryExecution, LocalQueryStream,
49};
50use crate::session::SessionImpl;
51
52pub async fn handle_query(
53    handler_args: HandlerArgs,
54    stmt: Statement,
55    formats: Vec<Format>,
56) -> Result<RwPgResponse> {
57    let session = handler_args.session.clone();
58    let plan_fragmenter_result = {
59        let context = OptimizerContext::from_handler_args(handler_args);
60        let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
61        // Time zone is used by Hummock time travel query.
62        risingwave_expr::expr_context::TIME_ZONE::sync_scope(
63            session.config().timezone().to_owned(),
64            || gen_batch_plan_fragmenter(&session, plan_result),
65        )?
66    };
67    execute(session, plan_fragmenter_result, formats).await
68}
69
70pub fn handle_parse(
71    handler_args: HandlerArgs,
72    statement: Statement,
73    specific_param_types: Vec<Option<DataType>>,
74) -> Result<PrepareStatement> {
75    let session = handler_args.session;
76    let bound_result = gen_bound(&session, statement.clone(), specific_param_types)?;
77
78    Ok(PrepareStatement::Prepared(PreparedResult {
79        statement,
80        bound_result,
81    }))
82}
83
84/// Execute a "Portal", which is a prepared statement with bound parameters.
85pub async fn handle_execute(
86    handler_args: HandlerArgs,
87    portal: PortalResult,
88) -> Result<RwPgResponse> {
89    let PortalResult {
90        bound_result,
91        result_formats,
92        statement,
93    } = portal;
94    match statement {
95        Statement::Query(_)
96        | Statement::Insert { .. }
97        | Statement::Delete { .. }
98        | Statement::Update { .. } => {
99            // Execute a batch query
100            let session = handler_args.session.clone();
101            let plan_fragmenter_result = {
102                let context = OptimizerContext::from_handler_args(handler_args);
103                let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
104                // Time zone is used by Hummock time travel query.
105                risingwave_expr::expr_context::TIME_ZONE::sync_scope(
106                    session.config().timezone().to_owned(),
107                    || gen_batch_plan_fragmenter(&session, plan_result),
108                )?
109            };
110            execute(session, plan_fragmenter_result, result_formats).await
111        }
112        Statement::CreateView { materialized, .. } if materialized => {
113            // Execute a CREATE MATERIALIZED VIEW
114            let BoundResult {
115                bound,
116                dependent_relations,
117                dependent_udfs,
118                ..
119            } = bound_result;
120            let create_mv = if let BoundStatement::CreateView(create_mv) = bound {
121                create_mv
122            } else {
123                unreachable!("expect a BoundStatement::CreateView")
124            };
125            let BoundCreateView {
126                or_replace,
127                materialized: _,
128                if_not_exists,
129                name,
130                columns,
131                query,
132                emit_mode,
133                with_options,
134            } = *create_mv;
135            if or_replace {
136                bail_not_implemented!("CREATE OR REPLACE VIEW");
137            }
138
139            // Hack: replace the `with_options` with the bounded ones.
140            let handler_args = HandlerArgs {
141                session: handler_args.session.clone(),
142                sql: handler_args.sql.clone(),
143                normalized_sql: handler_args.normalized_sql.clone(),
144                with_options: crate::WithOptions::try_from(with_options.as_slice())?,
145            };
146
147            create_mv::handle_create_mv_bound(
148                handler_args,
149                if_not_exists,
150                name,
151                *query,
152                dependent_relations,
153                dependent_udfs,
154                columns,
155                emit_mode,
156            )
157            .await
158        }
159        Statement::DeclareCursor { stmt } => match stmt.declare_cursor {
160            risingwave_sqlparser::ast::DeclareCursor::Query(_) => {
161                let session = handler_args.session.clone();
162                let plan_fragmenter_result = {
163                    let context = OptimizerContext::from_handler_args(handler_args.clone());
164                    let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?;
165                    gen_batch_plan_fragmenter(&session, plan_result)?
166                };
167                declare_cursor::handle_bound_declare_query_cursor(
168                    handler_args,
169                    stmt.cursor_name,
170                    plan_fragmenter_result,
171                )
172                .await
173            }
174            risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
175                declare_cursor::handle_declare_subscription_cursor(
176                    handler_args,
177                    sub_name,
178                    stmt.cursor_name,
179                    rw_timestamp,
180                )
181                .await
182            }
183        },
184        _ => unreachable!(),
185    }
186}
187
188pub fn gen_batch_plan_by_statement(
189    session: &SessionImpl,
190    context: OptimizerContextRef,
191    stmt: Statement,
192) -> Result<BatchQueryPlanResult> {
193    let bound_result = gen_bound(session, stmt, vec![])?;
194    gen_batch_query_plan(session, context, bound_result)
195}
196
197#[derive(Clone)]
198pub struct BoundResult {
199    pub(crate) stmt_type: StatementType,
200    pub(crate) must_dist: bool,
201    pub(crate) bound: BoundStatement,
202    pub(crate) param_types: Vec<DataType>,
203    pub(crate) parsed_params: Option<Vec<Datum>>,
204    pub(crate) dependent_relations: HashSet<TableId>,
205    /// TODO(rc): merge with `dependent_relations`
206    pub(crate) dependent_udfs: HashSet<FunctionId>,
207}
208
209fn gen_bound(
210    session: &SessionImpl,
211    stmt: Statement,
212    specific_param_types: Vec<Option<DataType>>,
213) -> Result<BoundResult> {
214    let stmt_type = StatementType::infer_from_statement(&stmt)
215        .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
216    let must_dist = must_run_in_distributed_mode(&stmt)?;
217
218    let mut binder = Binder::new_with_param_types(session, specific_param_types);
219    let bound = binder.bind(stmt)?;
220
221    Ok(BoundResult {
222        stmt_type,
223        must_dist,
224        bound,
225        param_types: binder.export_param_types()?,
226        parsed_params: None,
227        dependent_relations: binder.included_relations().clone(),
228        dependent_udfs: binder.included_udfs().clone(),
229    })
230}
231
232pub struct BatchQueryPlanResult {
233    pub(crate) plan: PlanRef,
234    pub(crate) query_mode: QueryMode,
235    pub(crate) schema: Schema,
236    pub(crate) stmt_type: StatementType,
237    // Note that these relations are only resolved in the binding phase, and it may only be a
238    // subset of the final one. i.e. the final one may contain more implicit dependencies on
239    // indices.
240    pub(crate) dependent_relations: Vec<TableId>,
241    pub(crate) read_storage_tables: HashSet<TableId>,
242}
243
244fn gen_batch_query_plan(
245    session: &SessionImpl,
246    context: OptimizerContextRef,
247    bind_result: BoundResult,
248) -> Result<BatchQueryPlanResult> {
249    let BoundResult {
250        stmt_type,
251        must_dist,
252        bound,
253        dependent_relations,
254        ..
255    } = bind_result;
256
257    let mut planner = if matches!(bound, BoundStatement::Query(_)) {
258        Planner::new_for_batch_dql(context)
259    } else {
260        Planner::new_for_batch(context)
261    };
262
263    let mut logical = planner.plan(bound)?;
264    let schema = logical.schema();
265    let batch_plan = logical.gen_batch_plan()?;
266
267    let dependent_relations =
268        RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.clone());
269
270    let read_storage_tables = ReadStorageTableVisitor::collect(batch_plan.clone());
271
272    let must_local = must_run_in_local_mode(batch_plan.clone());
273
274    let query_mode = match (must_dist, must_local) {
275        (true, true) => {
276            return Err(ErrorCode::InternalError(
277                "the query is forced to both local and distributed mode by optimizer".to_owned(),
278            )
279            .into());
280        }
281        (true, false) => QueryMode::Distributed,
282        (false, true) => QueryMode::Local,
283        (false, false) => match session.config().query_mode() {
284            QueryMode::Auto => determine_query_mode(batch_plan.clone()),
285            QueryMode::Local => QueryMode::Local,
286            QueryMode::Distributed => QueryMode::Distributed,
287        },
288    };
289
290    let physical = match query_mode {
291        QueryMode::Auto => unreachable!(),
292        QueryMode::Local => logical.gen_batch_local_plan()?,
293        QueryMode::Distributed => logical.gen_batch_distributed_plan()?,
294    };
295
296    Ok(BatchQueryPlanResult {
297        plan: physical,
298        query_mode,
299        schema,
300        stmt_type,
301        dependent_relations: dependent_relations.into_iter().collect_vec(),
302        read_storage_tables,
303    })
304}
305
306fn must_run_in_distributed_mode(stmt: &Statement) -> Result<bool> {
307    fn is_insert_using_select(stmt: &Statement) -> bool {
308        fn has_select_query(set_expr: &SetExpr) -> bool {
309            match set_expr {
310                SetExpr::Select(_) => true,
311                SetExpr::Query(query) => has_select_query(&query.body),
312                SetExpr::SetOperation { left, right, .. } => {
313                    has_select_query(left) || has_select_query(right)
314                }
315                SetExpr::Values(_) => false,
316            }
317        }
318
319        matches!(
320            stmt,
321            Statement::Insert {source, ..} if has_select_query(&source.body)
322        )
323    }
324
325    let stmt_type = StatementType::infer_from_statement(stmt)
326        .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
327
328    Ok(matches!(
329        stmt_type,
330        StatementType::UPDATE
331            | StatementType::DELETE
332            | StatementType::UPDATE_RETURNING
333            | StatementType::DELETE_RETURNING
334    ) | is_insert_using_select(stmt))
335}
336
337fn must_run_in_local_mode(batch_plan: PlanRef) -> bool {
338    SysTableVisitor::has_sys_table(batch_plan)
339}
340
341fn determine_query_mode(batch_plan: PlanRef) -> QueryMode {
342    if ExecutionModeDecider::run_in_local_mode(batch_plan) {
343        QueryMode::Local
344    } else {
345        QueryMode::Distributed
346    }
347}
348
349pub struct BatchPlanFragmenterResult {
350    pub(crate) plan_fragmenter: BatchPlanFragmenter,
351    pub(crate) query_mode: QueryMode,
352    pub(crate) schema: Schema,
353    pub(crate) stmt_type: StatementType,
354    pub(crate) read_storage_tables: HashSet<TableId>,
355}
356
357pub fn gen_batch_plan_fragmenter(
358    session: &SessionImpl,
359    plan_result: BatchQueryPlanResult,
360) -> Result<BatchPlanFragmenterResult> {
361    let BatchQueryPlanResult {
362        plan,
363        query_mode,
364        schema,
365        stmt_type,
366        read_storage_tables,
367        ..
368    } = plan_result;
369
370    tracing::trace!(
371        "Generated query plan: {:?}, query_mode:{:?}",
372        plan.explain_to_string(),
373        query_mode
374    );
375    let worker_node_manager_reader = WorkerNodeSelector::new(
376        session.env().worker_node_manager_ref(),
377        session.is_barrier_read(),
378    );
379    let plan_fragmenter = BatchPlanFragmenter::new(
380        worker_node_manager_reader,
381        session.env().catalog_reader().clone(),
382        session.config().batch_parallelism().0,
383        session.config().timezone().to_owned(),
384        plan,
385    )?;
386
387    Ok(BatchPlanFragmenterResult {
388        plan_fragmenter,
389        query_mode,
390        schema,
391        stmt_type,
392        read_storage_tables,
393    })
394}
395
396pub async fn create_stream(
397    session: Arc<SessionImpl>,
398    plan_fragmenter_result: BatchPlanFragmenterResult,
399    formats: Vec<Format>,
400) -> Result<(PgResponseStream, Vec<PgFieldDescriptor>)> {
401    let BatchPlanFragmenterResult {
402        plan_fragmenter,
403        query_mode,
404        schema,
405        stmt_type,
406        read_storage_tables,
407    } = plan_fragmenter_result;
408
409    let mut can_timeout_cancel = true;
410    // Acquire the write guard for DML statements.
411    match stmt_type {
412        StatementType::INSERT
413        | StatementType::INSERT_RETURNING
414        | StatementType::DELETE
415        | StatementType::DELETE_RETURNING
416        | StatementType::UPDATE
417        | StatementType::UPDATE_RETURNING => {
418            session.txn_write_guard()?;
419            can_timeout_cancel = false;
420        }
421        _ => {}
422    }
423
424    let query = plan_fragmenter.generate_complete_query().await?;
425    tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
426
427    let pg_descs = schema
428        .fields()
429        .iter()
430        .map(to_pg_field)
431        .collect::<Vec<PgFieldDescriptor>>();
432    let column_types = schema.fields().iter().map(|f| f.data_type()).collect_vec();
433
434    let row_stream = match query_mode {
435        QueryMode::Auto => unreachable!(),
436        QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
437            local_execute(
438                session.clone(),
439                query,
440                can_timeout_cancel,
441                &read_storage_tables,
442            )
443            .await?,
444            column_types,
445            formats,
446            session.clone(),
447        )),
448        // Local mode do not support cancel tasks.
449        QueryMode::Distributed => {
450            PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
451                distribute_execute(
452                    session.clone(),
453                    query,
454                    can_timeout_cancel,
455                    read_storage_tables,
456                )
457                .await?,
458                column_types,
459                formats,
460                session.clone(),
461            ))
462        }
463    };
464
465    Ok((row_stream, pg_descs))
466}
467
468async fn execute(
469    session: Arc<SessionImpl>,
470    plan_fragmenter_result: BatchPlanFragmenterResult,
471    formats: Vec<Format>,
472) -> Result<RwPgResponse> {
473    // Used in counting row count.
474    let first_field_format = formats.first().copied().unwrap_or(Format::Text);
475    let query_mode = plan_fragmenter_result.query_mode;
476    let stmt_type = plan_fragmenter_result.stmt_type;
477
478    let query_start_time = Instant::now();
479    let (row_stream, pg_descs) =
480        create_stream(session.clone(), plan_fragmenter_result, formats).await?;
481
482    // We need to do some post work after the query is finished and before the `Complete` response
483    // it sent. This is achieved by the `callback` in `PgResponse`.
484    let callback = async move {
485        // Implicitly flush the writes.
486        if session.config().implicit_flush() && stmt_type.is_dml() {
487            do_flush(&session).await?;
488        }
489
490        // update some metrics
491        match query_mode {
492            QueryMode::Auto => unreachable!(),
493            QueryMode::Local => {
494                session
495                    .env()
496                    .frontend_metrics
497                    .latency_local_execution
498                    .observe(query_start_time.elapsed().as_secs_f64());
499
500                session
501                    .env()
502                    .frontend_metrics
503                    .query_counter_local_execution
504                    .inc();
505            }
506            QueryMode::Distributed => {
507                session
508                    .env()
509                    .query_manager()
510                    .query_metrics
511                    .query_latency
512                    .observe(query_start_time.elapsed().as_secs_f64());
513
514                session
515                    .env()
516                    .query_manager()
517                    .query_metrics
518                    .completed_query_counter
519                    .inc();
520            }
521        }
522
523        Ok(())
524    };
525
526    Ok(PgResponse::builder(stmt_type)
527        .row_cnt_format_opt(Some(first_field_format))
528        .values(row_stream, pg_descs)
529        .callback(callback)
530        .into())
531}
532
533pub async fn distribute_execute(
534    session: Arc<SessionImpl>,
535    query: Query,
536    can_timeout_cancel: bool,
537    read_storage_tables: HashSet<TableId>,
538) -> Result<DistributedQueryStream> {
539    let timeout = if cfg!(madsim) {
540        None
541    } else if can_timeout_cancel {
542        Some(session.statement_timeout())
543    } else {
544        None
545    };
546    let execution_context: ExecutionContextRef =
547        ExecutionContext::new(session.clone(), timeout).into();
548    let query_manager = session.env().query_manager().clone();
549
550    query_manager
551        .schedule(execution_context, query, read_storage_tables)
552        .await
553        .map_err(|err| err.into())
554}
555
556pub async fn local_execute(
557    session: Arc<SessionImpl>,
558    query: Query,
559    can_timeout_cancel: bool,
560    read_storage_tables: &HashSet<TableId>,
561) -> Result<LocalQueryStream> {
562    let timeout = if cfg!(madsim) {
563        None
564    } else if can_timeout_cancel {
565        Some(session.statement_timeout())
566    } else {
567        None
568    };
569    let front_env = session.env();
570
571    let snapshot = session.pinned_snapshot();
572
573    let execution = LocalQueryExecution::new(
574        query,
575        front_env.clone(),
576        snapshot.support_barrier_read(),
577        snapshot.batch_query_epoch(read_storage_tables)?,
578        session,
579        timeout,
580    );
581
582    Ok(execution.stream_rows())
583}