risingwave_frontend/handler/
query.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Instant;
18
19use itertools::Itertools;
20use pgwire::pg_field_descriptor::PgFieldDescriptor;
21use pgwire::pg_response::{PgResponse, StatementType};
22use pgwire::types::Format;
23use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
24use risingwave_common::bail_not_implemented;
25use risingwave_common::catalog::{FunctionId, Schema, SecretId};
26use risingwave_common::id::ObjectId;
27use risingwave_common::session_config::QueryMode;
28use risingwave_common::types::{DataType, Datum};
29use risingwave_sqlparser::ast::{SetExpr, Statement};
30
31use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
32use super::{PgResponseStream, RwPgResponse, create_mv, declare_cursor};
33use crate::binder::{Binder, BoundCreateView, BoundStatement};
34#[cfg(feature = "datafusion")]
35use crate::datafusion::DfBatchQueryPlanResult;
36use crate::error::{ErrorCode, Result, RwError};
37use crate::handler::HandlerArgs;
38use crate::handler::flush::do_flush;
39use crate::handler::util::{DataChunkToRowSetAdapter, to_pg_field};
40use crate::optimizer::plan_node::{BatchPlanRef, Explain};
41use crate::optimizer::{
42    BatchPlanRoot, ExecutionModeDecider, OptimizerContext, OptimizerContextRef,
43    RelationCollectorVisitor, SysTableVisitor,
44};
45use crate::planner::Planner;
46use crate::scheduler::plan_fragmenter::Query;
47use crate::scheduler::{
48    BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
49    LocalQueryExecution, LocalQueryStream,
50};
51use crate::session::SessionImpl;
52
53/// Choice between running RisingWave's own batch executor (Rw) or a `DataFusion` (DF) logical plan.
54pub enum BatchPlanChoice {
55    Rw(RwBatchQueryPlanResult),
56    #[cfg(feature = "datafusion")]
57    Df(DfBatchQueryPlanResult),
58}
59
60impl BatchPlanChoice {
61    pub fn unwrap_rw(self) -> Result<RwBatchQueryPlanResult> {
62        match self {
63            BatchPlanChoice::Rw(result) => Ok(result),
64            #[cfg(feature = "datafusion")]
65            BatchPlanChoice::Df { .. } => {
66                risingwave_common::bail!(
67                    "Expected RisingWave plan in BatchPlanChoice, but got DataFusion plan"
68                )
69            }
70        }
71    }
72}
73
74pub async fn handle_query(
75    handler_args: HandlerArgs,
76    stmt: Statement,
77    formats: Vec<Format>,
78) -> Result<RwPgResponse> {
79    let session = handler_args.session.clone();
80    let context = OptimizerContext::from_handler_args(handler_args);
81
82    #[cfg(feature = "datafusion")]
83    {
84        // We construct a future manually here to make sure this async function is `Send`.
85        // `BatchPlanChoice` is non-Send, and rust cannot prove it has dropped before await point.
86        // See more details in https://github.com/rust-lang/rust/issues/128095
87        use futures::FutureExt;
88
89        use crate::datafusion::execute_datafusion_plan;
90
91        let future = match gen_batch_plan_by_statement(&session, context.into(), stmt)? {
92            BatchPlanChoice::Rw(plan_result) => {
93                let plan_fragmenter_result = risingwave_expr::expr_context::TIME_ZONE::sync_scope(
94                    session.config().timezone(),
95                    || gen_batch_plan_fragmenter(&session, plan_result),
96                )?;
97                execute_risingwave_plan(session, plan_fragmenter_result, formats).left_future()
98            }
99            BatchPlanChoice::Df(plan_result) => {
100                execute_datafusion_plan(session, plan_result, formats).right_future()
101            }
102        };
103        future.await
104    }
105
106    #[cfg(not(feature = "datafusion"))]
107    {
108        let future = match gen_batch_plan_by_statement(&session, context.into(), stmt)? {
109            BatchPlanChoice::Rw(plan_result) => {
110                let plan_fragmenter_result = risingwave_expr::expr_context::TIME_ZONE::sync_scope(
111                    session.config().timezone(),
112                    || gen_batch_plan_fragmenter(&session, plan_result),
113                )?;
114                execute_risingwave_plan(session, plan_fragmenter_result, formats)
115            }
116        };
117        future.await
118    }
119}
120
121fn handle_parse_inner(binder: Binder, statement: Statement) -> Result<PrepareStatement> {
122    let bound_result = gen_bound(binder, statement.clone())?;
123
124    Ok(PrepareStatement::Prepared(PreparedResult {
125        statement,
126        bound_result,
127    }))
128}
129
130pub fn handle_parse_for_batch(
131    handler_args: HandlerArgs,
132    statement: Statement,
133    specified_param_types: Vec<Option<DataType>>,
134) -> Result<PrepareStatement> {
135    let binder = Binder::new_for_batch(&handler_args.session)
136        .with_specified_params_types(specified_param_types);
137    handle_parse_inner(binder, statement)
138}
139
140pub fn handle_parse_for_stream(
141    handler_args: HandlerArgs,
142    statement: Statement,
143    specified_param_types: Vec<Option<DataType>>,
144) -> Result<PrepareStatement> {
145    let binder = Binder::new_for_stream(&handler_args.session)
146        .with_specified_params_types(specified_param_types);
147    handle_parse_inner(binder, statement)
148}
149
150/// Execute a "Portal", which is a prepared statement with bound parameters.
151pub async fn handle_execute(
152    handler_args: HandlerArgs,
153    portal: PortalResult,
154) -> Result<RwPgResponse> {
155    let PortalResult {
156        bound_result,
157        result_formats,
158        statement,
159    } = portal;
160    match statement {
161        Statement::Query(_)
162        | Statement::Insert { .. }
163        | Statement::Delete { .. }
164        | Statement::Update { .. } => {
165            // Execute a batch query
166            let session = handler_args.session.clone();
167            let plan_fragmenter_result = {
168                let context = OptimizerContext::from_handler_args(handler_args);
169                let plan_result =
170                    gen_batch_query_plan(&session, context.into(), bound_result)?.unwrap_rw()?;
171                // Time zone is used by Hummock time travel query.
172                risingwave_expr::expr_context::TIME_ZONE::sync_scope(
173                    session.config().timezone(),
174                    || gen_batch_plan_fragmenter(&session, plan_result),
175                )?
176            };
177            execute_risingwave_plan(session, plan_fragmenter_result, result_formats).await
178        }
179        Statement::CreateView { materialized, .. } if materialized => {
180            // Execute a CREATE MATERIALIZED VIEW
181            let BoundResult {
182                bound,
183                dependent_relations,
184                dependent_udfs,
185                dependent_secrets,
186                ..
187            } = bound_result;
188            let create_mv = if let BoundStatement::CreateView(create_mv) = bound {
189                create_mv
190            } else {
191                unreachable!("expect a BoundStatement::CreateView")
192            };
193            let BoundCreateView {
194                or_replace,
195                materialized: _,
196                if_not_exists,
197                name,
198                columns,
199                query,
200                emit_mode,
201                with_options,
202            } = *create_mv;
203            if or_replace {
204                bail_not_implemented!("CREATE OR REPLACE VIEW");
205            }
206
207            // Hack: replace the `with_options` with the bounded ones.
208            let handler_args = HandlerArgs {
209                session: handler_args.session.clone(),
210                sql: handler_args.sql.clone(),
211                normalized_sql: handler_args.normalized_sql.clone(),
212                with_options: crate::WithOptions::try_from(with_options.as_slice())?,
213            };
214
215            create_mv::handle_create_mv_bound(
216                handler_args,
217                if_not_exists,
218                name,
219                *query,
220                dependent_relations,
221                dependent_udfs,
222                dependent_secrets,
223                columns,
224                emit_mode,
225            )
226            .await
227        }
228        Statement::DeclareCursor { stmt } => match stmt.declare_cursor {
229            risingwave_sqlparser::ast::DeclareCursor::Query(_) => {
230                let session = handler_args.session.clone();
231                let plan_fragmenter_result = {
232                    let context = OptimizerContext::from_handler_args(handler_args.clone());
233                    let plan_result = gen_batch_query_plan(&session, context.into(), bound_result)?
234                        .unwrap_rw()?;
235                    gen_batch_plan_fragmenter(&session, plan_result)?
236                };
237                declare_cursor::handle_bound_declare_query_cursor(
238                    handler_args,
239                    stmt.cursor_name,
240                    plan_fragmenter_result,
241                )
242                .await
243            }
244            risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
245                declare_cursor::handle_declare_subscription_cursor(
246                    handler_args,
247                    sub_name,
248                    stmt.cursor_name,
249                    rw_timestamp,
250                )
251                .await
252            }
253        },
254        _ => unreachable!(),
255    }
256}
257
258pub fn gen_batch_plan_by_statement(
259    session: &SessionImpl,
260    context: OptimizerContextRef,
261    stmt: Statement,
262) -> Result<BatchPlanChoice> {
263    let binder = Binder::new_for_batch(session);
264    let bound_result = gen_bound(binder, stmt)?;
265    gen_batch_query_plan(session, context, bound_result)
266}
267
268#[derive(Clone)]
269pub struct BoundResult {
270    pub(crate) stmt_type: StatementType,
271    pub(crate) must_dist: bool,
272    pub(crate) bound: BoundStatement,
273    pub(crate) param_types: Vec<DataType>,
274    pub(crate) parsed_params: Option<Vec<Datum>>,
275    pub(crate) dependent_relations: HashSet<ObjectId>,
276    /// TODO(rc): merge with `dependent_relations`
277    pub(crate) dependent_udfs: HashSet<FunctionId>,
278    pub(crate) dependent_secrets: HashSet<SecretId>,
279}
280
281fn gen_bound(mut binder: Binder, stmt: Statement) -> Result<BoundResult> {
282    let stmt_type = StatementType::infer_from_statement(&stmt)
283        .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
284    let must_dist = must_run_in_distributed_mode(&stmt)?;
285
286    let bound = binder.bind(stmt)?;
287
288    Ok(BoundResult {
289        stmt_type,
290        must_dist,
291        bound,
292        param_types: binder.export_param_types()?,
293        parsed_params: None,
294        dependent_relations: binder.included_relations().clone(),
295        dependent_udfs: binder.included_udfs().clone(),
296        dependent_secrets: binder.included_secrets().clone(),
297    })
298}
299
300pub struct RwBatchQueryPlanResult {
301    pub(crate) plan: BatchPlanRef,
302    pub(crate) query_mode: QueryMode,
303    pub(crate) schema: Schema,
304    pub(crate) stmt_type: StatementType,
305    // Note that these relations are only resolved in the binding phase, and it may only be a
306    // subset of the final one. i.e. the final one may contain more implicit dependencies on
307    // indices.
308    pub(crate) dependent_relations: Vec<ObjectId>,
309    pub(crate) dependent_secrets: Vec<SecretId>,
310}
311
312fn gen_batch_query_plan(
313    session: &SessionImpl,
314    context: OptimizerContextRef,
315    bind_result: BoundResult,
316) -> Result<BatchPlanChoice> {
317    let BoundResult {
318        stmt_type,
319        must_dist,
320        bound,
321        dependent_relations,
322        dependent_secrets,
323        ..
324    } = bind_result;
325
326    let mut planner = if matches!(bound, BoundStatement::Query(_)) {
327        Planner::new_for_batch_dql(context)
328    } else {
329        Planner::new_for_batch(context)
330    };
331
332    let logical = planner.plan(bound)?;
333    let schema = logical.schema();
334    let optimized_logical = logical.gen_optimized_logical_plan_for_batch()?;
335
336    #[cfg(feature = "datafusion")]
337    {
338        if session.config().enable_datafusion_engine() {
339            use thiserror_ext::AsReport;
340
341            use crate::datafusion::{GenDataFusionPlanError, try_gen_datafusion_plan};
342
343            match try_gen_datafusion_plan(&optimized_logical) {
344                Ok(plan) => {
345                    return Ok(BatchPlanChoice::Df(DfBatchQueryPlanResult {
346                        plan,
347                        schema,
348                        stmt_type,
349                    }));
350                }
351                Err(GenDataFusionPlanError::MissingIcebergScan) => {}
352                Err(err) => {
353                    tracing::warn!(
354                        "Failed to generate DataFusion plan, fallback to RisingWave plan: {}",
355                        err.as_report()
356                    );
357                }
358            }
359        }
360    }
361
362    let batch_plan = optimized_logical.gen_batch_plan()?;
363
364    let dependent_relations =
365        RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.plan.clone());
366
367    let must_local = must_run_in_local_mode(&batch_plan);
368
369    let query_mode = match (must_dist, must_local) {
370        (true, true) => {
371            return Err(ErrorCode::InternalError(
372                "the query is forced to both local and distributed mode by optimizer".to_owned(),
373            )
374            .into());
375        }
376        (true, false) => QueryMode::Distributed,
377        (false, true) => QueryMode::Local,
378        (false, false) => match session.config().query_mode() {
379            QueryMode::Auto => determine_query_mode(&batch_plan),
380            QueryMode::Local => QueryMode::Local,
381            QueryMode::Distributed => QueryMode::Distributed,
382        },
383    };
384
385    let physical = match query_mode {
386        QueryMode::Auto => unreachable!(),
387        QueryMode::Local => batch_plan.gen_batch_local_plan()?,
388        QueryMode::Distributed => batch_plan.gen_batch_distributed_plan()?,
389    };
390
391    let result = RwBatchQueryPlanResult {
392        plan: physical,
393        query_mode,
394        schema,
395        stmt_type,
396        dependent_relations: dependent_relations.into_iter().collect_vec(),
397        dependent_secrets: dependent_secrets.into_iter().collect_vec(),
398    };
399    Ok(BatchPlanChoice::Rw(result))
400}
401
402fn must_run_in_distributed_mode(stmt: &Statement) -> Result<bool> {
403    fn is_insert_using_select(stmt: &Statement) -> bool {
404        fn has_select_query(set_expr: &SetExpr) -> bool {
405            match set_expr {
406                SetExpr::Select(_) => true,
407                SetExpr::Query(query) => has_select_query(&query.body),
408                SetExpr::SetOperation { left, right, .. } => {
409                    has_select_query(left) || has_select_query(right)
410                }
411                SetExpr::Values(_) => false,
412            }
413        }
414
415        matches!(
416            stmt,
417            Statement::Insert {source, ..} if has_select_query(&source.body)
418        )
419    }
420
421    let stmt_type = StatementType::infer_from_statement(stmt)
422        .map_err(|err| RwError::from(ErrorCode::InvalidInputSyntax(err)))?;
423
424    Ok(matches!(
425        stmt_type,
426        StatementType::UPDATE
427            | StatementType::DELETE
428            | StatementType::UPDATE_RETURNING
429            | StatementType::DELETE_RETURNING
430    ) | is_insert_using_select(stmt))
431}
432
433fn must_run_in_local_mode(batch_plan: &BatchPlanRoot) -> bool {
434    SysTableVisitor::has_sys_table(batch_plan)
435}
436
437fn determine_query_mode(batch_plan: &BatchPlanRoot) -> QueryMode {
438    if ExecutionModeDecider::run_in_local_mode(batch_plan) {
439        QueryMode::Local
440    } else {
441        QueryMode::Distributed
442    }
443}
444
445pub struct BatchPlanFragmenterResult {
446    pub(crate) plan_fragmenter: BatchPlanFragmenter,
447    pub(crate) query_mode: QueryMode,
448    pub(crate) schema: Schema,
449    pub(crate) stmt_type: StatementType,
450}
451
452pub fn gen_batch_plan_fragmenter(
453    session: &SessionImpl,
454    plan_result: RwBatchQueryPlanResult,
455) -> Result<BatchPlanFragmenterResult> {
456    let RwBatchQueryPlanResult {
457        plan,
458        query_mode,
459        schema,
460        stmt_type,
461        ..
462    } = plan_result;
463
464    tracing::trace!(
465        "Generated query plan: {:?}, query_mode:{:?}",
466        plan.explain_to_string(),
467        query_mode
468    );
469    let worker_node_manager_reader = WorkerNodeSelector::new(
470        session.env().worker_node_manager_ref(),
471        session.is_barrier_read(),
472    );
473    let plan_fragmenter = BatchPlanFragmenter::new(
474        worker_node_manager_reader,
475        session.env().catalog_reader().clone(),
476        session.config().batch_parallelism().0,
477        plan,
478    )?;
479
480    Ok(BatchPlanFragmenterResult {
481        plan_fragmenter,
482        query_mode,
483        schema,
484        stmt_type,
485    })
486}
487
488pub async fn create_stream(
489    session: Arc<SessionImpl>,
490    plan_fragmenter_result: BatchPlanFragmenterResult,
491    formats: Vec<Format>,
492) -> Result<(PgResponseStream, Vec<PgFieldDescriptor>)> {
493    let BatchPlanFragmenterResult {
494        plan_fragmenter,
495        query_mode,
496        schema,
497        stmt_type,
498        ..
499    } = plan_fragmenter_result;
500
501    let mut can_timeout_cancel = true;
502    // Acquire the write guard for DML statements.
503    match stmt_type {
504        StatementType::INSERT
505        | StatementType::INSERT_RETURNING
506        | StatementType::DELETE
507        | StatementType::DELETE_RETURNING
508        | StatementType::UPDATE
509        | StatementType::UPDATE_RETURNING => {
510            session.txn_write_guard()?;
511            can_timeout_cancel = false;
512        }
513        _ => {}
514    }
515
516    let query = plan_fragmenter.generate_complete_query().await?;
517    tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
518
519    let pg_descs = schema
520        .fields()
521        .iter()
522        .map(to_pg_field)
523        .collect::<Vec<PgFieldDescriptor>>();
524    let column_types = schema.fields().iter().map(|f| f.data_type()).collect_vec();
525
526    let row_stream = match query_mode {
527        QueryMode::Auto => unreachable!(),
528        QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
529            local_execute(session.clone(), query, can_timeout_cancel).await?,
530            column_types,
531            formats,
532            session.clone(),
533        )),
534        // Local mode do not support cancel tasks.
535        QueryMode::Distributed => {
536            PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
537                distribute_execute(session.clone(), query, can_timeout_cancel).await?,
538                column_types,
539                formats,
540                session.clone(),
541            ))
542        }
543    };
544
545    Ok((row_stream, pg_descs))
546}
547
548async fn execute_risingwave_plan(
549    session: Arc<SessionImpl>,
550    plan_fragmenter_result: BatchPlanFragmenterResult,
551    formats: Vec<Format>,
552) -> Result<RwPgResponse> {
553    // Used in counting row count.
554    let first_field_format = formats.first().copied().unwrap_or(Format::Text);
555    let query_mode = plan_fragmenter_result.query_mode;
556    let stmt_type = plan_fragmenter_result.stmt_type;
557
558    let query_start_time = Instant::now();
559    let (row_stream, pg_descs) =
560        create_stream(session.clone(), plan_fragmenter_result, formats).await?;
561
562    // We need to do some post work after the query is finished and before the `Complete` response
563    // it sent. This is achieved by the `callback` in `PgResponse`.
564    let callback = async move {
565        // Implicitly flush the writes.
566        if session.config().implicit_flush() && stmt_type.is_dml() {
567            do_flush(&session).await?;
568        }
569
570        // update some metrics
571        match query_mode {
572            QueryMode::Auto => unreachable!(),
573            QueryMode::Local => {
574                session
575                    .env()
576                    .frontend_metrics
577                    .latency_local_execution
578                    .observe(query_start_time.elapsed().as_secs_f64());
579
580                session
581                    .env()
582                    .frontend_metrics
583                    .query_counter_local_execution
584                    .inc();
585            }
586            QueryMode::Distributed => {
587                session
588                    .env()
589                    .query_manager()
590                    .query_metrics
591                    .query_latency
592                    .observe(query_start_time.elapsed().as_secs_f64());
593
594                session
595                    .env()
596                    .query_manager()
597                    .query_metrics
598                    .completed_query_counter
599                    .inc();
600            }
601        }
602
603        Ok(())
604    };
605
606    Ok(PgResponse::builder(stmt_type)
607        .row_cnt_format_opt(Some(first_field_format))
608        .values(row_stream, pg_descs)
609        .callback(callback)
610        .into())
611}
612
613pub async fn distribute_execute(
614    session: Arc<SessionImpl>,
615    query: Query,
616    can_timeout_cancel: bool,
617) -> Result<DistributedQueryStream> {
618    let timeout = if cfg!(madsim) {
619        None
620    } else if can_timeout_cancel {
621        Some(session.statement_timeout())
622    } else {
623        None
624    };
625    let execution_context: ExecutionContextRef =
626        ExecutionContext::new(session.clone(), timeout).into();
627    let query_manager = session.env().query_manager().clone();
628
629    query_manager
630        .schedule(execution_context, query)
631        .await
632        .map_err(|err| err.into())
633}
634
635pub async fn local_execute(
636    session: Arc<SessionImpl>,
637    mut query: Query,
638    can_timeout_cancel: bool,
639) -> Result<LocalQueryStream> {
640    let timeout = if cfg!(madsim) {
641        None
642    } else if can_timeout_cancel {
643        Some(session.statement_timeout())
644    } else {
645        None
646    };
647    let front_env = session.env();
648
649    let snapshot = session.pinned_snapshot();
650
651    snapshot.fill_batch_query_epoch(&mut query)?;
652
653    let execution = LocalQueryExecution::new(
654        query,
655        front_env.clone(),
656        snapshot.support_barrier_read(),
657        session,
658        timeout,
659    );
660
661    Ok(execution.stream_rows())
662}